• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3535

23 Nov 2024 02:07AM UTC coverage: 60.85% (+0.03%) from 60.825%
#3535

push

travis-ci

web-flow
Merge pull request #28893 from taosdata/doc/internal

refact: rename taos lib name

120252 of 252737 branches covered (47.58%)

Branch coverage included in aggregate %.

201187 of 275508 relevant lines covered (73.02%)

15886166.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.59
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "scheduler.h"
22
#include "tdatablock.h"
23
#include "tdataformat.h"
24
#include "tdef.h"
25
#include "tglobal.h"
26
#include "tmsgtype.h"
27
#include "tpagedbuf.h"
28
#include "tref.h"
29
#include "tsched.h"
30
#include "tversion.h"
31
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
32
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
33

34
void setQueryRequest(int64_t rId) {
1,477,646✔
35
  SRequestObj* pReq = acquireRequest(rId);
1,477,646✔
36
  if (pReq != NULL) {
1,477,653✔
37
    pReq->isQuery = true;
1,477,637✔
38
    (void)releaseRequest(rId);
1,477,637✔
39
  }
40
}
1,477,650✔
41

42
static bool stringLengthCheck(const char* str, size_t maxsize) {
24,288✔
43
  if (str == NULL) {
24,288!
44
    return false;
×
45
  }
46

47
  size_t len = strlen(str);
24,288✔
48
  if (len <= 0 || len > maxsize) {
24,288!
49
    return false;
×
50
  }
51

52
  return true;
24,290✔
53
}
54

55
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
12,130✔
56

57
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
12,128✔
58

59
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
30✔
60

61
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
12,130✔
62
  char key[512] = {0};
12,130✔
63
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
12,130✔
64
  return taosStrdup(key);
12,130✔
65
}
66

67
bool chkRequestKilled(void* param) {
34,921,419✔
68
  bool         killed = false;
34,921,419✔
69
  SRequestObj* pRequest = acquireRequest((int64_t)param);
34,921,419✔
70
  if (NULL == pRequest || pRequest->killed) {
35,500,162!
71
    killed = true;
2✔
72
  }
73

74
  (void)releaseRequest((int64_t)param);
35,500,162✔
75

76
  return killed;
35,414,822✔
77
}
78

79
void cleanupAppInfo() {
×
80
  taosHashCleanup(appInfo.pInstMap);
×
81
  taosHashCleanup(appInfo.pInstMapByClusterId);
×
82
}
×
83

84
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
85
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
86

87
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
12,132✔
88
                              uint16_t port, int connType, STscObj** pObj) {
89
  TSC_ERR_RET(taos_init());
12,132!
90
  if (!validateUserName(user)) {
12,131!
91
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
92
  }
93

94
  char localDb[TSDB_DB_NAME_LEN] = {0};
12,130✔
95
  if (db != NULL && strlen(db) > 0) {
12,130!
96
    if (!validateDbName(db)) {
30!
97
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
98
    }
99

100
    tstrncpy(localDb, db, sizeof(localDb));
30✔
101
    (void)strdequote(localDb);
30✔
102
  }
103

104
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
12,130✔
105
  if (auth == NULL) {
12,130✔
106
    if (!validatePassword(pass)) {
12,128!
107
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
108
    }
109

110
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
12,131✔
111
  } else {
112
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
2✔
113
  }
114

115
  SCorEpSet epSet = {0};
12,129✔
116
  if (ip) {
12,129✔
117
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
8,501✔
118
  } else {
119
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
3,628!
120
  }
121

122
  if (port) {
12,130✔
123
    epSet.epSet.eps[0].port = port;
5,740✔
124
    epSet.epSet.eps[1].port = port;
5,740✔
125
  }
126

127
  char* key = getClusterKey(user, secretEncrypt, ip, port);
12,130✔
128
  if (NULL == key) {
12,130!
129
    TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
130
  }
131
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
12,130!
132
          user, db, key);
133
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
27,892✔
134
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
15,762!
135
  }
136

137
  SAppInstInfo** pInst = NULL;
12,130✔
138
  int32_t        code = taosThreadMutexLock(&appInfo.mutex);
12,130✔
139
  if (TSDB_CODE_SUCCESS != code) {
12,130!
140
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
141
    TSC_ERR_RET(code);
×
142
  }
143

144
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
12,130✔
145
  SAppInstInfo* p = NULL;
12,130✔
146
  if (pInst == NULL) {
12,130✔
147
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
4,349✔
148
    if (NULL == p) {
4,349!
149
      TSC_ERR_JRET(terrno);
×
150
    }
151
    p->mgmtEp = epSet;
4,349✔
152
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
4,349✔
153
    if (TSDB_CODE_SUCCESS != code) {
4,349!
154
      taosMemoryFree(p);
×
155
      TSC_ERR_JRET(code);
×
156
    }
157
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
4,349✔
158
    if (TSDB_CODE_SUCCESS != code) {
4,349!
159
      taosMemoryFree(p);
×
160
      TSC_ERR_JRET(code);
×
161
    }
162
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
4,349✔
163
    if (TSDB_CODE_SUCCESS != code) {
4,349!
164
      destroyAppInst(&p);
×
165
      TSC_ERR_JRET(code);
×
166
    }
167
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
4,349✔
168
    if (TSDB_CODE_SUCCESS != code) {
4,349!
169
      destroyAppInst(&p);
×
170
      TSC_ERR_JRET(code);
×
171
    }
172
    p->instKey = key;
4,349✔
173
    key = NULL;
4,349✔
174
    tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
4,349✔
175

176
    pInst = &p;
4,349✔
177
  } else {
178
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
7,781!
179
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
180
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
181
    }
182
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
183
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
7,781✔
184
  }
185

186
_return:
12,130✔
187

188
  if (TSDB_CODE_SUCCESS != code) {
12,130!
189
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
190
    taosMemoryFreeClear(key);
×
191
    return code;
×
192
  } else {
193
    code = taosThreadMutexUnlock(&appInfo.mutex);
12,130✔
194
    taosMemoryFreeClear(key);
12,130✔
195
    if (TSDB_CODE_SUCCESS != code) {
12,130!
196
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
197
      return code;
×
198
    }
199
    return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
12,130✔
200
  }
201
}
202

203
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
204
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
205
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
206
//     return *ppAppInstInfo;
207
//   } else {
208
//     return NULL;
209
//   }
210
// }
211

212
void freeQueryParam(SSyncQueryParam* param) {
667✔
213
  if (param == NULL) return;
667!
214
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
667!
215
    tscError("failed to destroy semaphore in freeQueryParam");
×
216
  }
217
  taosMemoryFree(param);
667✔
218
}
219

220
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
10,844,011✔
221
                     SRequestObj** pRequest, int64_t reqid) {
222
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
10,844,011✔
223
  if (TSDB_CODE_SUCCESS != code) {
10,850,552!
224
    tscError("failed to malloc sqlObj, %s", sql);
×
225
    return code;
×
226
  }
227

228
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
10,850,552✔
229
  if ((*pRequest)->sqlstr == NULL) {
10,840,590!
230
    tscError("0x%" PRIx64 " failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
231
    destroyRequest(*pRequest);
×
232
    *pRequest = NULL;
×
233
    return terrno;
×
234
  }
235

236
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
10,840,590✔
237
  (*pRequest)->sqlstr[sqlLen] = 0;
10,847,301✔
238
  (*pRequest)->sqlLen = sqlLen;
10,847,301✔
239
  (*pRequest)->validateOnly = validateSql;
10,847,301✔
240
  (*pRequest)->isStmtBind = false;
10,847,301✔
241

242
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
10,847,301✔
243

244
  STscObj* pTscObj = (*pRequest)->pTscObj;
10,847,301✔
245
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
10,847,301✔
246
                             sizeof((*pRequest)->self));
247
  if (err) {
10,824,726!
248
    tscError("%" PRId64 " failed to add to request container,QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
249
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
250
    destroyRequest(*pRequest);
×
251
    *pRequest = NULL;
×
252
    return terrno;
×
253
  }
254

255
  (*pRequest)->allocatorRefId = -1;
10,824,726✔
256
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
10,824,726!
257
    if (TSDB_CODE_SUCCESS !=
1,252,443!
258
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
1,252,433✔
259
      tscError("%" PRId64 " failed to create node allocator,QID:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self,
×
260
               (*pRequest)->requestId, pTscObj->id, sql);
261
      destroyRequest(*pRequest);
×
262
      *pRequest = NULL;
×
263
      return terrno;
×
264
    }
265
  }
266

267
  tscDebugL("0x%" PRIx64 " SQL: %s,QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
10,845,749✔
268
  return TSDB_CODE_SUCCESS;
10,841,041✔
269
}
270

271
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
509✔
272
  int32_t code =
273
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
509✔
274
  if (TSDB_CODE_SUCCESS == code) {
509!
275
    pRequest->relation.prevRefId = (*pNewRequest)->self;
509✔
276
    (*pNewRequest)->relation.nextRefId = pRequest->self;
509✔
277
    (*pNewRequest)->relation.userRefId = pRequest->self;
509✔
278
    (*pNewRequest)->isSubReq = true;
509✔
279
  }
280
  return code;
509✔
281
}
282

283
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
51✔
284
  STscObj* pTscObj = pRequest->pTscObj;
51✔
285

286
  SParseContext cxt = {.requestId = pRequest->requestId,
51✔
287
                       .requestRid = pRequest->self,
51✔
288
                       .acctId = pTscObj->acctId,
51✔
289
                       .db = pRequest->pDb,
51✔
290
                       .topicQuery = topicQuery,
291
                       .pSql = pRequest->sqlstr,
51✔
292
                       .sqlLen = pRequest->sqlLen,
51✔
293
                       .pMsg = pRequest->msgBuf,
51✔
294
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
295
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
51✔
296
                       .pStmtCb = pStmtCb,
297
                       .pUser = pTscObj->user,
51✔
298
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
51✔
299
                       .enableSysInfo = pTscObj->sysInfo,
51✔
300
                       .svrVer = pTscObj->sVer,
51✔
301
                       .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
51✔
302
                       .isStmtBind = pRequest->isStmtBind,
51✔
303
                       .setQueryFp = setQueryRequest};
304

305
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
51✔
306
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
51✔
307
  if (code != TSDB_CODE_SUCCESS) {
51!
308
    return code;
×
309
  }
310

311
  code = qParseSql(&cxt, pQuery);
51✔
312
  if (TSDB_CODE_SUCCESS == code) {
51✔
313
    if ((*pQuery)->haveResultSet) {
48!
314
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
×
315
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
316
    }
317
  }
318

319
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
51!
320
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
48✔
321
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
48✔
322
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
48✔
323
  }
324

325
  taosArrayDestroy(cxt.pTableMetaPos);
51✔
326
  taosArrayDestroy(cxt.pTableVgroupPos);
51✔
327

328
  return code;
51✔
329
}
330

331
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
332
  SRetrieveTableRsp* pRsp = NULL;
×
333
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
334
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode);
×
335
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
336
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4);
×
337
  }
338

339
  return code;
×
340
}
341

342
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
939✔
343
  // drop table if exists not_exists_table
344
  if (NULL == pQuery->pCmdMsg) {
939!
345
    return TSDB_CODE_SUCCESS;
×
346
  }
347

348
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
939✔
349
  pRequest->type = pMsgInfo->msgType;
939✔
350
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
939✔
351
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
939✔
352

353
  STscObj*      pTscObj = pRequest->pTscObj;
939✔
354
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
939✔
355

356
  // int64_t transporterId = 0;
357
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
939!
358
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
939!
359
  return TSDB_CODE_SUCCESS;
939✔
360
}
361

362
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
20,818,865✔
363

364
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
101,639✔
365
  SRetrieveTableRsp* pRsp = NULL;
101,639✔
366
  if (pRequest->validateOnly) {
101,639✔
367
    doRequestCallback(pRequest, 0);
18✔
368
    return;
18✔
369
  }
370

371
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
101,621✔
372
                              atomic_load_8(&pRequest->pTscObj->biMode));
101,621✔
373
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
101,621✔
374
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4);
78,056✔
375
  }
376

377
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
101,621✔
378
  pRequest->code = code;
101,621✔
379

380
  if (pRequest->code != TSDB_CODE_SUCCESS) {
101,621✔
381
    pResultInfo->numOfRows = 0;
13✔
382
    tscError("0x%" PRIx64 " fetch results failed, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(code),
13!
383
             pRequest->requestId);
384
  } else {
385
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
101,608✔
386
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
387
             pRequest->requestId);
388
  }
389

390
  doRequestCallback(pRequest, code);
101,621✔
391
}
392

393
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
37,374✔
394
  if (pRequest->validateOnly) {
37,374!
395
    doRequestCallback(pRequest, 0);
×
396
    return TSDB_CODE_SUCCESS;
×
397
  }
398

399
  // drop table if exists not_exists_table
400
  if (NULL == pQuery->pCmdMsg) {
37,374✔
401
    doRequestCallback(pRequest, 0);
1✔
402
    return TSDB_CODE_SUCCESS;
1✔
403
  }
404

405
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
37,373✔
406
  pRequest->type = pMsgInfo->msgType;
37,373✔
407
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
37,373✔
408
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
37,373✔
409

410
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
37,373✔
411
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
37,373✔
412

413
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
37,376✔
414
  if (code) {
37,376!
415
    doRequestCallback(pRequest, code);
×
416
  }
417
  return code;
37,376✔
418
}
419

420
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
217,085✔
421
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
217,085✔
422
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
217,085✔
423

424
  if (node1->load < node2->load) {
217,085!
425
    return -1;
×
426
  }
427

428
  return node1->load > node2->load;
217,085✔
429
}
430

431
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
77,591✔
432
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
77,591!
433
  if (pInfo->pQnodeList) {
77,591✔
434
    taosArrayDestroy(pInfo->pQnodeList);
77,006✔
435
    pInfo->pQnodeList = NULL;
77,006✔
436
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
77,006✔
437
  }
438

439
  if (pNodeList) {
77,591!
440
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
77,591✔
441
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
77,591✔
442
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
77,591✔
443
             taosArrayGetSize(pInfo->pQnodeList));
444
  }
445
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
77,591!
446

447
  return TSDB_CODE_SUCCESS;
77,591✔
448
}
449

450
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
10,852,203✔
451
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
10,852,203✔
452
    *required = false;
10,214,326✔
453
    return TSDB_CODE_SUCCESS;
10,214,326✔
454
  }
455

456
  int32_t       code = TSDB_CODE_SUCCESS;
637,877✔
457
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
637,877✔
458
  *required = false;
637,877✔
459

460
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
637,877!
461
  *required = (NULL == pInfo->pQnodeList);
637,877✔
462
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
637,877!
463
  return TSDB_CODE_SUCCESS;
637,877✔
464
}
465

466
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
467
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
468
  int32_t       code = 0;
×
469

470
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
471
  if (pInfo->pQnodeList) {
×
472
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
473
  }
474
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
475
  if (NULL == *pNodeList) {
×
476
    SCatalog* pCatalog = NULL;
×
477
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
478
    if (TSDB_CODE_SUCCESS == code) {
×
479
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
480
      if (NULL == pNodeList) {
×
481
        TSC_ERR_RET(terrno);
×
482
      }
483
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
484
                               .requestId = pRequest->requestId,
×
485
                               .requestObjRefId = pRequest->self,
×
486
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
487
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
488
    }
489

490
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
491
      code = updateQnodeList(pInfo, *pNodeList);
×
492
    }
493
  }
494

495
  return code;
×
496
}
497

498
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
1,273✔
499
  pRequest->type = pQuery->msgType;
1,273✔
500
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
1,273✔
501

502
  SPlanContext cxt = {.queryId = pRequest->requestId,
2,547✔
503
                      .acctId = pRequest->pTscObj->acctId,
1,273✔
504
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
1,273✔
505
                      .pAstRoot = pQuery->pRoot,
1,274✔
506
                      .showRewrite = pQuery->showRewrite,
1,274✔
507
                      .pMsg = pRequest->msgBuf,
1,274✔
508
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
509
                      .pUser = pRequest->pTscObj->user,
1,274✔
510
                      .sysInfo = pRequest->pTscObj->sysInfo};
1,274✔
511

512
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
1,274✔
513
}
514

515
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
1,281,303✔
516
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
1,281,303!
517
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
518
    return TSDB_CODE_INVALID_PARA;
×
519
  }
520

521
  pResInfo->numOfCols = numOfCols;
1,281,323✔
522
  if (pResInfo->fields != NULL) {
1,281,323✔
523
    taosMemoryFree(pResInfo->fields);
35✔
524
  }
525
  if (pResInfo->userFields != NULL) {
1,281,323✔
526
    taosMemoryFree(pResInfo->userFields);
35✔
527
  }
528
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,281,323✔
529
  if (NULL == pResInfo->fields) return terrno;
1,281,314!
530
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,281,314✔
531
  if (NULL == pResInfo->userFields) {
1,281,324!
532
    taosMemoryFree(pResInfo->fields);
×
533
    return terrno;
×
534
  }
535
  if (numOfCols != pResInfo->numOfCols) {
1,281,334!
536
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
537
    return TSDB_CODE_FAILED;
×
538
  }
539

540
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
6,287,183✔
541
    pResInfo->fields[i].bytes = pSchema[i].bytes;
5,005,849✔
542
    pResInfo->fields[i].type = pSchema[i].type;
5,005,849✔
543

544
    pResInfo->userFields[i].bytes = pSchema[i].bytes;
5,005,849✔
545
    pResInfo->userFields[i].type = pSchema[i].type;
5,005,849✔
546

547
    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY ||
5,005,849✔
548
        pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
4,022,649✔
549
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
983,247✔
550
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
4,022,602✔
551
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
273,877✔
552
    }
553

554
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
5,005,849✔
555
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
5,005,849✔
556
  }
557
  return TSDB_CODE_SUCCESS;
1,281,334✔
558
}
559

560
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
925,750✔
561
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
925,750!
562
      precision != TSDB_TIME_PRECISION_NANO) {
563
    return;
×
564
  }
565

566
  pResInfo->precision = precision;
925,750✔
567
}
568

569
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
608,511✔
570
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
608,511✔
571
  if (NULL == nodeList) {
608,542!
572
    return terrno;
×
573
  }
574
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
608,544✔
575

576
  int32_t dbNum = taosArrayGetSize(pDbVgList);
608,544✔
577
  for (int32_t i = 0; i < dbNum; ++i) {
1,212,055✔
578
    SArray* pVg = taosArrayGetP(pDbVgList, i);
603,513✔
579
    if (NULL == pVg) {
603,505!
580
      continue;
×
581
    }
582
    int32_t vgNum = taosArrayGetSize(pVg);
603,505✔
583
    if (vgNum <= 0) {
603,517✔
584
      continue;
606✔
585
    }
586

587
    for (int32_t j = 0; j < vgNum; ++j) {
3,050,919✔
588
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
2,448,024✔
589
      if (NULL == pInfo) {
2,447,893!
590
        taosArrayDestroy(nodeList);
×
591
        return TSDB_CODE_OUT_OF_RANGE;
×
592
      }
593
      SQueryNodeLoad load = {0};
2,447,893✔
594
      load.addr.nodeId = pInfo->vgId;
2,447,893✔
595
      load.addr.epSet = pInfo->epSet;
2,447,893✔
596

597
      if (NULL == taosArrayPush(nodeList, &load)) {
2,448,008!
598
        taosArrayDestroy(nodeList);
×
599
        return terrno;
×
600
      }
601
    }
602
  }
603

604
  int32_t vnodeNum = taosArrayGetSize(nodeList);
608,542✔
605
  if (vnodeNum > 0) {
608,564✔
606
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
602,308✔
607
    goto _return;
602,307✔
608
  }
609

610
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
6,256✔
611
  if (mnodeNum <= 0) {
6,257!
612
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
613
    goto _return;
×
614
  }
615

616
  void* pData = taosArrayGet(pMnodeList, 0);
6,257✔
617
  if (NULL == pData) {
6,257!
618
    taosArrayDestroy(nodeList);
×
619
    return TSDB_CODE_OUT_OF_RANGE;
×
620
  }
621
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
6,257!
622
    taosArrayDestroy(nodeList);
×
623
    return terrno;
×
624
  }
625

626
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
6,257✔
627

628
_return:
5,085✔
629

630
  *pNodeList = nodeList;
608,563✔
631

632
  return TSDB_CODE_SUCCESS;
608,563✔
633
}
634

635
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
292,059✔
636
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
292,059✔
637
  if (NULL == nodeList) {
292,059!
638
    return terrno;
×
639
  }
640

641
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
292,059✔
642
  if (qNodeNum > 0) {
292,059✔
643
    void* pData = taosArrayGet(pQnodeList, 0);
292,053✔
644
    if (NULL == pData) {
292,053!
645
      taosArrayDestroy(nodeList);
×
646
      return TSDB_CODE_OUT_OF_RANGE;
×
647
    }
648
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
292,053!
649
      taosArrayDestroy(nodeList);
×
650
      return terrno;
×
651
    }
652
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
292,053✔
653
    goto _return;
292,053✔
654
  }
655

656
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
6✔
657
  if (mnodeNum <= 0) {
6✔
658
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
4!
659
    goto _return;
4✔
660
  }
661

662
  void* pData = taosArrayGet(pMnodeList, 0);
2✔
663
  if (NULL == pData) {
2!
664
    taosArrayDestroy(nodeList);
×
665
    return TSDB_CODE_OUT_OF_RANGE;
×
666
  }
667
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
2!
668
    taosArrayDestroy(nodeList);
×
669
    return terrno;
×
670
  }
671

672
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
2!
673

674
_return:
×
675

676
  *pNodeList = nodeList;
292,059✔
677

678
  return TSDB_CODE_SUCCESS;
292,059✔
679
}
680

681
void freeVgList(void* list) {
1,039✔
682
  SArray* pList = *(SArray**)list;
1,039✔
683
  taosArrayDestroy(pList);
1,039✔
684
}
1,038✔
685

686
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
899,291✔
687
  SArray* pDbVgList = NULL;
899,291✔
688
  SArray* pQnodeList = NULL;
899,291✔
689
  FDelete fp = NULL;
899,291✔
690
  int32_t code = 0;
899,291✔
691

692
  switch (tsQueryPolicy) {
899,291!
693
    case QUERY_POLICY_VNODE:
607,261✔
694
    case QUERY_POLICY_CLIENT: {
695
      if (pResultMeta) {
607,261!
696
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
607,286✔
697
        if (NULL == pDbVgList) {
607,256!
698
          code = terrno;
×
699
          goto _return;
×
700
        }
701
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
607,256✔
702
        for (int32_t i = 0; i < dbNum; ++i) {
1,209,711✔
703
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
602,461✔
704
          if (pRes->code || NULL == pRes->pRes) {
602,457!
705
            continue;
×
706
          }
707

708
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
1,204,948!
709
            code = terrno;
×
710
            goto _return;
×
711
          }
712
        }
713
      } else {
714
        fp = freeVgList;
×
715

716
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
717
        if (dbNum > 0) {
×
718
          SCatalog*     pCtg = NULL;
×
719
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
720
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
721
          if (code != TSDB_CODE_SUCCESS) {
×
722
            goto _return;
×
723
          }
724

725
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
726
          if (NULL == pDbVgList) {
×
727
            code = terrno;
×
728
            goto _return;
×
729
          }
730
          SArray* pVgList = NULL;
×
731
          for (int32_t i = 0; i < dbNum; ++i) {
×
732
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
733
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
734
                                     .requestId = pRequest->requestId,
×
735
                                     .requestObjRefId = pRequest->self,
×
736
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
737

738
            // catalogGetDBVgList will handle dbFName == null.
739
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
740
            if (code) {
×
741
              goto _return;
×
742
            }
743

744
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
745
              code = terrno;
×
746
              goto _return;
×
747
            }
748
          }
749
        }
750
      }
751

752
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
607,250✔
753
      break;
607,289✔
754
    }
755
    case QUERY_POLICY_HYBRID:
292,057✔
756
    case QUERY_POLICY_QNODE: {
757
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
297,486!
758
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
5,429✔
759
        if (pRes->code) {
5,429!
760
          pQnodeList = NULL;
×
761
        } else {
762
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
5,429✔
763
          if (NULL == pQnodeList) {
5,429!
764
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
765
            goto _return;
×
766
          }
767
        }
768
      } else {
769
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
286,629✔
770
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
286,629!
771
        if (pInst->pQnodeList) {
286,630!
772
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
286,630✔
773
          if (NULL == pQnodeList) {
286,630!
774
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
775
            goto _return;
×
776
          }
777
        }
778
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
286,630!
779
      }
780

781
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
292,059✔
782
      break;
292,059✔
783
    }
784
    default:
×
785
      tscError("unknown query policy: %d", tsQueryPolicy);
×
786
      return TSDB_CODE_APP_ERROR;
×
787
  }
788

789
_return:
899,348✔
790
  taosArrayDestroyEx(pDbVgList, fp);
899,348✔
791
  taosArrayDestroy(pQnodeList);
899,354✔
792

793
  return code;
899,358✔
794
}
795

796
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
1,274✔
797
  SArray* pDbVgList = NULL;
1,274✔
798
  SArray* pQnodeList = NULL;
1,274✔
799
  int32_t code = 0;
1,274✔
800

801
  switch (tsQueryPolicy) {
1,274!
802
    case QUERY_POLICY_VNODE:
1,274✔
803
    case QUERY_POLICY_CLIENT: {
804
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
1,274✔
805
      if (dbNum > 0) {
1,274✔
806
        SCatalog*     pCtg = NULL;
1,039✔
807
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,039✔
808
        code = catalogGetHandle(pInst->clusterId, &pCtg);
1,039✔
809
        if (code != TSDB_CODE_SUCCESS) {
1,039!
810
          goto _return;
×
811
        }
812

813
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
1,039✔
814
        if (NULL == pDbVgList) {
1,039!
815
          code = terrno;
×
816
          goto _return;
×
817
        }
818
        SArray* pVgList = NULL;
1,039✔
819
        for (int32_t i = 0; i < dbNum; ++i) {
2,078✔
820
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
1,039✔
821
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
1,039✔
822
                                   .requestId = pRequest->requestId,
1,039✔
823
                                   .requestObjRefId = pRequest->self,
1,039✔
824
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
1,039✔
825

826
          // catalogGetDBVgList will handle dbFName == null.
827
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
1,039✔
828
          if (code) {
1,039!
829
            goto _return;
×
830
          }
831

832
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
1,039!
833
            code = terrno;
×
834
            goto _return;
×
835
          }
836
        }
837
      }
838

839
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
1,274✔
840
      break;
1,274✔
841
    }
842
    case QUERY_POLICY_HYBRID:
×
843
    case QUERY_POLICY_QNODE: {
844
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
845

846
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
847
      break;
×
848
    }
849
    default:
×
850
      tscError("unknown query policy: %d", tsQueryPolicy);
×
851
      return TSDB_CODE_APP_ERROR;
×
852
  }
853

854
_return:
1,274✔
855

856
  taosArrayDestroyEx(pDbVgList, freeVgList);
1,274✔
857
  taosArrayDestroy(pQnodeList);
1,274✔
858

859
  return code;
1,274✔
860
}
861

862
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
1,273✔
863
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
1,273✔
864

865
  SExecResult      res = {0};
1,273✔
866
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
1,273✔
867
                           .requestId = pRequest->requestId,
1,273✔
868
                           .requestObjRefId = pRequest->self};
1,273✔
869
  SSchedulerReq    req = {
2,547✔
870
         .syncReq = true,
871
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
1,273✔
872
         .pConn = &conn,
873
         .pNodeList = pNodeList,
874
         .pDag = pDag,
875
         .sql = pRequest->sqlstr,
1,273✔
876
         .startTs = pRequest->metric.start,
1,273✔
877
         .execFp = NULL,
878
         .cbParam = NULL,
879
         .chkKillFp = chkRequestKilled,
880
         .chkKillParam = (void*)pRequest->self,
1,273✔
881
         .pExecRes = &res,
882
         .source = pRequest->source,
1,273✔
883
         .pWorkerCb = getTaskPoolWorkerCb(),
1,273✔
884
  };
885

886
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
1,274✔
887

888
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
1,274✔
889
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
1,274✔
890

891
  if (code != TSDB_CODE_SUCCESS) {
1,274!
892
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
893

894
    pRequest->code = code;
×
895
    terrno = code;
×
896
    return pRequest->code;
×
897
  }
898

899
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
1,274!
900
      TDMT_VND_CREATE_TABLE == pRequest->type) {
170✔
901
    pRequest->body.resInfo.numOfRows = res.numOfRows;
1,235✔
902
    if (TDMT_VND_SUBMIT == pRequest->type) {
1,235✔
903
      STscObj*            pTscObj = pRequest->pTscObj;
1,103✔
904
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,103✔
905
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
1,103✔
906
    }
907

908
    schedulerFreeJob(&pRequest->body.queryJob, 0);
1,236✔
909
  }
910

911
  pRequest->code = res.code;
1,274✔
912
  terrno = res.code;
1,274✔
913
  return pRequest->code;
1,274✔
914
}
915

916
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
9,566,401✔
917
  SArray*      pArray = NULL;
9,566,401✔
918
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
9,566,401✔
919
  if (NULL == pRsp->aCreateTbRsp) {
9,566,401✔
920
    return TSDB_CODE_SUCCESS;
9,515,576✔
921
  }
922

923
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
50,825✔
924
  for (int32_t i = 0; i < tbNum; ++i) {
112,869✔
925
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
59,402✔
926
    if (pTbRsp->pMeta) {
59,402✔
927
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
39,780!
928
    }
929
  }
930

931
  return TSDB_CODE_SUCCESS;
53,467✔
932
}
933

934
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
703,911✔
935
  int32_t code = 0;
703,911✔
936
  SArray* pArray = NULL;
703,911✔
937
  SArray* pTbArray = (SArray*)res;
703,911✔
938
  int32_t tbNum = taosArrayGetSize(pTbArray);
703,911✔
939
  if (tbNum <= 0) {
703,909!
940
    return TSDB_CODE_SUCCESS;
×
941
  }
942

943
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
703,909✔
944
  if (NULL == pArray) {
703,926✔
945
    return terrno;
3✔
946
  }
947

948
  for (int32_t i = 0; i < tbNum; ++i) {
2,002,104✔
949
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
1,298,179✔
950
    if (NULL == tbInfo) {
1,298,179!
951
      code = terrno;
×
952
      goto _return;
×
953
    }
954
    STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
1,298,179✔
955
    if (NULL == taosArrayPush(pArray, &tbSver)) {
1,298,181!
956
      code = terrno;
×
957
      goto _return;
×
958
    }
959
  }
960

961
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
703,925✔
962
                           .requestId = pRequest->requestId,
703,925✔
963
                           .requestObjRefId = pRequest->self,
703,925✔
964
                           .mgmtEps = *epset};
965

966
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
703,925✔
967

968
_return:
703,910✔
969

970
  taosArrayDestroy(pArray);
703,910✔
971
  return code;
703,924✔
972
}
973

974
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
3,349✔
975
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
3,349✔
976
}
977

978
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
137,459✔
979
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
137,459✔
980
}
981

982
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
10,528,031✔
983
  if (NULL == pRequest->body.resInfo.execRes.res) {
10,528,031✔
984
    return pRequest->code;
225,402✔
985
  }
986

987
  SCatalog*     pCatalog = NULL;
10,302,629✔
988
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
10,302,629✔
989

990
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
10,303,419✔
991
  if (code) {
10,303,497!
992
    return code;
×
993
  }
994

995
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
10,303,497✔
996
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
10,340,659✔
997

998
  switch (pRes->msgType) {
10,340,659✔
999
    case TDMT_VND_ALTER_TABLE:
1,034✔
1000
    case TDMT_MND_ALTER_STB: {
1001
      code = handleAlterTbExecRes(pRes->res, pCatalog);
1,034✔
1002
      break;
1,034✔
1003
    }
1004
    case TDMT_VND_CREATE_TABLE: {
57,525✔
1005
      SArray* pList = (SArray*)pRes->res;
57,525✔
1006
      int32_t num = taosArrayGetSize(pList);
57,525✔
1007
      for (int32_t i = 0; i < num; ++i) {
148,113✔
1008
        void* res = taosArrayGetP(pList, i);
90,589✔
1009
        // handleCreateTbExecRes will handle res == null
1010
        code = handleCreateTbExecRes(res, pCatalog);
90,584✔
1011
      }
1012
      break;
57,524✔
1013
    }
1014
    case TDMT_MND_CREATE_STB: {
843✔
1015
      code = handleCreateTbExecRes(pRes->res, pCatalog);
843✔
1016
      break;
843✔
1017
    }
1018
    case TDMT_VND_SUBMIT: {
9,570,847✔
1019
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
9,570,847✔
1020

1021
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
9,575,103✔
1022
      break;
9,567,655✔
1023
    }
1024
    case TDMT_SCH_QUERY:
703,925✔
1025
    case TDMT_SCH_MERGE_QUERY: {
1026
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
703,925✔
1027
      break;
703,903✔
1028
    }
1029
    default:
6,485✔
1030
      tscError("0x%" PRIx64 ", invalid exec result for request type %d,QID:0x%" PRIx64, pRequest->self, pRequest->type,
6,485!
1031
               pRequest->requestId);
1032
      code = TSDB_CODE_APP_ERROR;
×
1033
  }
1034

1035
  return code;
10,330,959✔
1036
}
1037

1038
static bool incompletaFileParsing(SNode* pStmt) {
10,553,585✔
1039
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
10,553,585✔
1040
}
1041

1042
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
487✔
1043
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
487✔
1044

1045
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
487✔
1046
  if (TSDB_CODE_SUCCESS == code) {
487!
1047
    int64_t analyseStart = taosGetTimestampUs();
487✔
1048
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
487✔
1049
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
487✔
1050
  }
1051

1052
  if (TSDB_CODE_SUCCESS == code) {
487!
1053
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
487✔
1054
  }
1055

1056
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
487✔
1057
  handleQueryAnslyseRes(pWrapper, NULL, code);
487✔
1058
}
487✔
1059

1060
void returnToUser(SRequestObj* pRequest) {
80,993✔
1061
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
80,993!
1062
    // return to client
1063
    doRequestCallback(pRequest, pRequest->code);
80,993✔
1064
    return;
80,993✔
1065
  }
1066

1067
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1068
  if (pUserReq) {
×
1069
    pUserReq->code = pRequest->code;
×
1070
    // return to client
1071
    doRequestCallback(pUserReq, pUserReq->code);
×
1072
    (void)releaseRequest(pRequest->relation.userRefId);
×
1073
    return;
×
1074
  } else {
1075
    tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
×
1076
             pRequest->relation.userRefId, pRequest->requestId);
1077
  }
1078
}
1079

1080
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
487✔
1081
  int64_t     lastTs = 0;
487✔
1082
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
487✔
1083
  int32_t     numOfFields = taos_num_fields(pRes);
487✔
1084

1085
  int32_t code = createDataBlock(pBlock);
487✔
1086
  if (code) {
487!
1087
    return code;
×
1088
  }
1089

1090
  for (int32_t i = 0; i < numOfFields; ++i) {
1,948✔
1091
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
1,461✔
1092
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
1,461✔
1093
    if (TSDB_CODE_SUCCESS != code) {
1,461!
1094
      blockDataDestroy(*pBlock);
×
1095
      return code;
×
1096
    }
1097
  }
1098

1099
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
487✔
1100
  if (TSDB_CODE_SUCCESS != code) {
487!
1101
    blockDataDestroy(*pBlock);
×
1102
    return code;
×
1103
  }
1104

1105
  for (int32_t i = 0; i < numOfRows; ++i) {
1,480✔
1106
    TAOS_ROW pRow = taos_fetch_row(pRes);
993✔
1107
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
993!
1108
      tscError("invalid data from vnode");
×
1109
      blockDataDestroy(*pBlock);
×
1110
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1111
    }
1112
    int64_t ts = *(int64_t*)pRow[0];
993✔
1113
    if (lastTs < ts) {
993✔
1114
      lastTs = ts;
568✔
1115
    }
1116

1117
    for (int32_t j = 0; j < numOfFields; ++j) {
3,972✔
1118
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
2,979✔
1119
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
2,979✔
1120
      if (TSDB_CODE_SUCCESS != code) {
2,979!
1121
        blockDataDestroy(*pBlock);
×
1122
        return code;
×
1123
      }
1124
    }
1125

1126
    tscDebug("lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
993✔
1127
  }
1128

1129
  (*pBlock)->info.window.ekey = lastTs;
487✔
1130
  (*pBlock)->info.rows = numOfRows;
487✔
1131

1132
  tscDebug("lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
487✔
1133
  return TSDB_CODE_SUCCESS;
487✔
1134
}
1135

1136
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
487✔
1137
  SRequestObj* pRequest = (SRequestObj*)res;
487✔
1138
  if (pRequest->code) {
487!
1139
    returnToUser(pRequest);
×
1140
    return;
×
1141
  }
1142

1143
  SSDataBlock* pBlock = NULL;
487✔
1144
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
487✔
1145
  if (TSDB_CODE_SUCCESS != pRequest->code) {
487!
1146
    tscError("0x%" PRIx64 ", create result block failed,QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1147
             tstrerror(pRequest->code));
1148
    returnToUser(pRequest);
×
1149
    return;
×
1150
  }
1151

1152
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
487✔
1153
  if (pNextReq) {
487!
1154
    continuePostSubQuery(pNextReq, pBlock);
487✔
1155
    (void)releaseRequest(pRequest->relation.nextRefId);
487✔
1156
  } else {
1157
    tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
×
1158
             pRequest->relation.nextRefId, pRequest->requestId);
1159
  }
1160

1161
  blockDataDestroy(pBlock);
487✔
1162
}
1163

1164
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
487✔
1165
  SRequestObj* pRequest = pWrapper->pRequest;
487✔
1166
  if (TD_RES_QUERY(pRequest)) {
487!
1167
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
487✔
1168
    return;
487✔
1169
  }
1170

1171
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1172
  if (pNextReq) {
×
1173
    continuePostSubQuery(pNextReq, NULL);
×
1174
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1175
  } else {
1176
    tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
×
1177
             pRequest->relation.nextRefId, pRequest->requestId);
1178
  }
1179
}
1180

1181
// todo refacto the error code  mgmt
1182
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
10,493,851✔
1183
  SSqlCallbackWrapper* pWrapper = param;
10,493,851✔
1184
  SRequestObj*         pRequest = pWrapper->pRequest;
10,493,851✔
1185
  STscObj*             pTscObj = pRequest->pTscObj;
10,493,851✔
1186

1187
  pRequest->code = code;
10,493,851✔
1188
  if (pResult) {
10,493,851✔
1189
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
10,492,909✔
1190
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
10,500,920✔
1191
  }
1192

1193
  int32_t type = pRequest->type;
10,501,862✔
1194
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
10,501,862✔
1195
    if (pResult) {
9,636,881!
1196
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
9,644,498✔
1197

1198
      // record the insert rows
1199
      if (TDMT_VND_SUBMIT == type) {
9,644,498✔
1200
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
9,529,079✔
1201
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
9,529,079✔
1202
      }
1203
    }
1204
    schedulerFreeJob(&pRequest->body.queryJob, 0);
9,683,799✔
1205
  }
1206

1207
  taosMemoryFree(pResult);
10,540,937✔
1208
  tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(code),
10,561,379✔
1209
           pRequest->requestId);
1210

1211
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
10,558,380!
1212
    tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d,QID:0x%" PRIx64, pRequest->self,
573✔
1213
             tstrerror(code), pRequest->retry, pRequest->requestId);
1214
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
572!
1215
      tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1216
    }
1217
    restartAsyncQuery(pRequest, code);
573✔
1218
    return;
573✔
1219
  }
1220

1221
  tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
10,557,807!
1222
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
10,557,807!
1223
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
25,674!
1224
      tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1225
    }
1226
  }
1227

1228
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
10,533,908✔
1229
  int32_t code1 = handleQueryExecRsp(pRequest);
10,533,908✔
1230
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
10,552,702!
1231
    pRequest->code = code1;
×
1232
  }
1233

1234
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
21,106,744!
1235
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
10,550,030✔
1236
    continueInsertFromCsv(pWrapper, pRequest);
1,047✔
1237
    return;
20✔
1238
  }
1239

1240
  if (pRequest->relation.nextRefId) {
10,555,667✔
1241
    handlePostSubQuery(pWrapper);
487✔
1242
  } else {
1243
    destorySqlCallbackWrapper(pWrapper);
10,555,180✔
1244
    pRequest->pWrapper = NULL;
10,560,293✔
1245

1246
    // return to client
1247
    doRequestCallback(pRequest, code);
10,560,293✔
1248
  }
1249
}
1250

1251
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
2,213✔
1252
  int32_t code = 0;
2,213✔
1253
  int32_t subplanNum = 0;
2,213✔
1254

1255
  if (pQuery->pRoot) {
2,213✔
1256
    pRequest->stmtType = pQuery->pRoot->type;
1,274✔
1257
  }
1258

1259
  if (pQuery->pRoot && !pRequest->inRetry) {
2,213!
1260
    STscObj*            pTscObj = pRequest->pTscObj;
1,274✔
1261
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,274✔
1262
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
1,274✔
1263
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
1,258✔
1264
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
16!
1265
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
16✔
1266
    }
1267
  }
1268

1269
  pRequest->body.execMode = pQuery->execMode;
2,213✔
1270
  switch (pQuery->execMode) {
2,213!
1271
    case QUERY_EXEC_MODE_LOCAL:
×
1272
      if (!pRequest->validateOnly) {
×
1273
        if (NULL == pQuery->pRoot) {
×
1274
          terrno = TSDB_CODE_INVALID_PARA;
×
1275
          code = terrno;
×
1276
        } else {
1277
          code = execLocalCmd(pRequest, pQuery);
×
1278
        }
1279
      }
1280
      break;
×
1281
    case QUERY_EXEC_MODE_RPC:
939✔
1282
      if (!pRequest->validateOnly) {
939!
1283
        code = execDdlQuery(pRequest, pQuery);
939✔
1284
      }
1285
      break;
939✔
1286
    case QUERY_EXEC_MODE_SCHEDULE: {
1,274✔
1287
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,274✔
1288
      if (NULL == pMnodeList) {
1,274!
1289
        code = terrno;
×
1290
        break;
×
1291
      }
1292
      SQueryPlan* pDag = NULL;
1,274✔
1293
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
1,274✔
1294
      if (TSDB_CODE_SUCCESS == code) {
1,274!
1295
        pRequest->body.subplanNum = pDag->numOfSubplans;
1,274✔
1296
        if (!pRequest->validateOnly) {
1,274!
1297
          SArray* pNodeList = NULL;
1,274✔
1298
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
1,274✔
1299
          if (TSDB_CODE_SUCCESS == code) {
1,274!
1300
            code = scheduleQuery(pRequest, pDag, pNodeList);
1,274✔
1301
          }
1302
          taosArrayDestroy(pNodeList);
1,274✔
1303
        }
1304
      }
1305
      taosArrayDestroy(pMnodeList);
1,274✔
1306
      break;
1,274✔
1307
    }
1308
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1309
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1310
      break;
×
1311
    default:
×
1312
      break;
×
1313
  }
1314

1315
  if (!keepQuery) {
2,213!
1316
    qDestroyQuery(pQuery);
×
1317
  }
1318

1319
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
2,213!
1320
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
99!
1321
    if (TSDB_CODE_SUCCESS != ret) {
99!
1322
      tscError("0x%" PRIx64 " remove meta failed,code:%d,QID:0x%" PRIx64, pRequest->self, ret, pRequest->requestId);
×
1323
    }
1324
  }
1325

1326
  if (TSDB_CODE_SUCCESS == code) {
2,213✔
1327
    code = handleQueryExecRsp(pRequest);
2,211✔
1328
  }
1329

1330
  if (TSDB_CODE_SUCCESS != code) {
2,213✔
1331
    pRequest->code = code;
38✔
1332
  }
1333

1334
  if (res) {
2,213!
1335
    *res = pRequest->body.resInfo.execRes.res;
×
1336
    pRequest->body.resInfo.execRes.res = NULL;
×
1337
  }
1338
}
2,213✔
1339

1340
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
10,544,587✔
1341
                                 SSqlCallbackWrapper* pWrapper) {
1342
  int32_t code = TSDB_CODE_SUCCESS;
10,544,587✔
1343
  pRequest->type = pQuery->msgType;
10,544,587✔
1344
  SArray*     pMnodeList = NULL;
10,544,587✔
1345
  SQueryPlan* pDag = NULL;
10,544,587✔
1346
  int64_t     st = taosGetTimestampUs();
10,541,157✔
1347

1348
  if (!pRequest->parseOnly) {
10,541,157!
1349
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
10,544,109✔
1350
    if (NULL == pMnodeList) {
10,527,005!
1351
      code = terrno;
×
1352
    }
1353
    SPlanContext cxt = {.queryId = pRequest->requestId,
21,076,316✔
1354
                        .acctId = pRequest->pTscObj->acctId,
10,527,005✔
1355
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
10,527,005✔
1356
                        .pAstRoot = pQuery->pRoot,
10,549,311✔
1357
                        .showRewrite = pQuery->showRewrite,
10,549,311✔
1358
                        .isView = pWrapper->pParseCtx->isView,
10,549,311✔
1359
                        .isAudit = pWrapper->pParseCtx->isAudit,
10,549,311✔
1360
                        .pMsg = pRequest->msgBuf,
10,549,311✔
1361
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1362
                        .pUser = pRequest->pTscObj->user,
10,549,311✔
1363
                        .sysInfo = pRequest->pTscObj->sysInfo,
10,549,311✔
1364
                        .allocatorId = pRequest->allocatorRefId};
10,549,311✔
1365
    if (TSDB_CODE_SUCCESS == code) {
10,549,311✔
1366
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
10,547,056✔
1367
    }
1368
    if (code) {
10,507,911✔
1369
      tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
896!
1370
               pRequest->requestId);
1371
    } else {
1372
      pRequest->body.subplanNum = pDag->numOfSubplans;
10,507,015✔
1373
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
10,507,015✔
1374
    }
1375
  }
1376

1377
  pRequest->metric.execStart = taosGetTimestampUs();
10,493,934✔
1378
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
10,493,934✔
1379

1380
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
21,018,099!
1381
    SArray* pNodeList = NULL;
10,516,509✔
1382
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
10,516,509✔
1383
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
899,344✔
1384
    }
1385

1386
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
10,516,522✔
1387
                             .requestId = pRequest->requestId,
10,489,125✔
1388
                             .requestObjRefId = pRequest->self};
10,489,125✔
1389
    SSchedulerReq    req = {
20,970,765✔
1390
           .syncReq = false,
1391
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
10,489,125✔
1392
           .pConn = &conn,
1393
           .pNodeList = pNodeList,
1394
           .pDag = pDag,
1395
           .allocatorRefId = pRequest->allocatorRefId,
10,489,125✔
1396
           .sql = pRequest->sqlstr,
10,489,125✔
1397
           .startTs = pRequest->metric.start,
10,489,125✔
1398
           .execFp = schedulerExecCb,
1399
           .cbParam = pWrapper,
1400
           .chkKillFp = chkRequestKilled,
1401
           .chkKillParam = (void*)pRequest->self,
10,489,125✔
1402
           .pExecRes = NULL,
1403
           .source = pRequest->source,
10,489,125✔
1404
           .pWorkerCb = getTaskPoolWorkerCb(),
10,489,125✔
1405
    };
1406
    if (TSDB_CODE_SUCCESS == code) {
10,481,640!
1407
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
10,482,313✔
1408
    }
1409

1410
    taosArrayDestroy(pNodeList);
10,521,343✔
1411
  } else {
1412
    qDestroyQueryPlan(pDag);
×
1413
    tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
1,077✔
1414
             pRequest->requestId);
1415
    destorySqlCallbackWrapper(pWrapper);
1,077✔
1416
    pRequest->pWrapper = NULL;
1,077✔
1417
    if (TSDB_CODE_SUCCESS != code) {
1,077✔
1418
      pRequest->code = terrno;
896✔
1419
    }
1420

1421
    doRequestCallback(pRequest, code);
1,077✔
1422
  }
1423

1424
  // todo not to be released here
1425
  taosArrayDestroy(pMnodeList);
10,525,242✔
1426

1427
  return code;
10,528,414✔
1428
}
1429

1430
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
10,613,479✔
1431
  int32_t code = 0;
10,613,479✔
1432

1433
  if (pRequest->parseOnly) {
10,613,479✔
1434
    doRequestCallback(pRequest, 0);
418✔
1435
    return;
418✔
1436
  }
1437

1438
  pRequest->body.execMode = pQuery->execMode;
10,613,061✔
1439
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
10,613,061✔
1440
    destorySqlCallbackWrapper(pWrapper);
142,281✔
1441
    pRequest->pWrapper = NULL;
142,280✔
1442
  }
1443

1444
  if (pQuery->pRoot && !pRequest->inRetry) {
10,613,060!
1445
    STscObj*            pTscObj = pRequest->pTscObj;
10,618,552✔
1446
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
10,618,552✔
1447
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
10,618,552✔
1448
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
9,600,809✔
1449
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
9,518,844✔
1450
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
1,099,708✔
1451
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
755,582✔
1452
    }
1453
  }
1454

1455
  switch (pQuery->execMode) {
10,702,499!
1456
    case QUERY_EXEC_MODE_LOCAL:
101,639✔
1457
      asyncExecLocalCmd(pRequest, pQuery);
101,639✔
1458
      break;
101,638✔
1459
    case QUERY_EXEC_MODE_RPC:
37,375✔
1460
      code = asyncExecDdlQuery(pRequest, pQuery);
37,375✔
1461
      break;
37,377✔
1462
    case QUERY_EXEC_MODE_SCHEDULE: {
10,560,219✔
1463
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
10,560,219✔
1464
      break;
10,512,280✔
1465
    }
1466
    case QUERY_EXEC_MODE_EMPTY_RESULT:
3,266✔
1467
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
3,266✔
1468
      doRequestCallback(pRequest, 0);
3,266✔
1469
      break;
3,266✔
1470
    default:
×
1471
      tscError("0x%" PRIx64 " invalid execMode %d", pRequest->self, pQuery->execMode);
×
1472
      doRequestCallback(pRequest, -1);
×
1473
      break;
×
1474
  }
1475
}
1476

1477
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
176✔
1478
  SCatalog* pCatalog = NULL;
176✔
1479
  int32_t   code = 0;
176✔
1480
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
176✔
1481
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
176✔
1482

1483
  if (dbNum <= 0 && tblNum <= 0) {
176!
1484
    return TSDB_CODE_APP_ERROR;
176✔
1485
  }
1486

1487
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
1488
  if (code != TSDB_CODE_SUCCESS) {
×
1489
    return code;
×
1490
  }
1491

1492
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1493
                           .requestId = pRequest->requestId,
×
1494
                           .requestObjRefId = pRequest->self,
×
1495
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1496

1497
  for (int32_t i = 0; i < dbNum; ++i) {
×
1498
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1499

1500
    // catalogRefreshDBVgInfo will handle dbFName == null.
1501
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
1502
    if (code != TSDB_CODE_SUCCESS) {
×
1503
      return code;
×
1504
    }
1505
  }
1506

1507
  for (int32_t i = 0; i < tblNum; ++i) {
×
1508
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1509

1510
    // catalogRefreshTableMeta will handle tableName == null.
1511
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
1512
    if (code != TSDB_CODE_SUCCESS) {
×
1513
      return code;
×
1514
    }
1515
  }
1516

1517
  return code;
×
1518
}
1519

1520
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
28,898✔
1521
  SCatalog* pCatalog = NULL;
28,898✔
1522
  int32_t   tbNum = taosArrayGetSize(tbList);
28,898✔
1523
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
28,898✔
1524
  if (code != TSDB_CODE_SUCCESS) {
28,899!
1525
    return code;
×
1526
  }
1527

1528
  if (isView) {
28,899✔
1529
    for (int32_t i = 0; i < tbNum; ++i) {
648✔
1530
      SName* pViewName = taosArrayGet(tbList, i);
324✔
1531
      char   dbFName[TSDB_DB_FNAME_LEN];
1532
      if (NULL == pViewName) {
324!
1533
        continue;
×
1534
      }
1535
      (void)tNameGetFullDbName(pViewName, dbFName);
324✔
1536
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
324!
1537
    }
1538
  } else {
1539
    for (int32_t i = 0; i < tbNum; ++i) {
47,174✔
1540
      SName* pTbName = taosArrayGet(tbList, i);
18,599✔
1541
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
18,599!
1542
    }
1543
  }
1544

1545
  return TSDB_CODE_SUCCESS;
28,899✔
1546
}
1547

1548
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
12,128✔
1549
  pEpSet->version = 0;
12,128✔
1550

1551
  // init mnode ip set
1552
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
12,128✔
1553
  mgmtEpSet->numOfEps = 0;
12,128✔
1554
  mgmtEpSet->inUse = 0;
12,128✔
1555

1556
  if (firstEp && firstEp[0] != 0) {
12,128!
1557
    if (strlen(firstEp) >= TSDB_EP_LEN) {
12,131!
1558
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1559
      return -1;
×
1560
    }
1561

1562
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
12,131✔
1563
    if (code != TSDB_CODE_SUCCESS) {
12,132!
1564
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1565
      return terrno;
×
1566
    }
1567
    uint32_t addr = 0;
12,132✔
1568
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
12,132✔
1569
    if (code) {
12,133✔
1570
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
3!
1571
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1572
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
3✔
1573
    } else {
1574
      mgmtEpSet->numOfEps++;
12,130✔
1575
    }
1576
  }
1577

1578
  if (secondEp && secondEp[0] != 0) {
12,130!
1579
    if (strlen(secondEp) >= TSDB_EP_LEN) {
3,632!
1580
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1581
      return terrno;
×
1582
    }
1583

1584
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
3,632✔
1585
    if (code != TSDB_CODE_SUCCESS) {
3,632!
1586
      return code;
×
1587
    }
1588
    uint32_t addr = 0;
3,632✔
1589
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
3,632✔
1590
    if (code) {
3,632!
1591
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1592
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1593
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1594
    } else {
1595
      mgmtEpSet->numOfEps++;
3,632✔
1596
    }
1597
  }
1598

1599
  if (mgmtEpSet->numOfEps == 0) {
12,130✔
1600
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
3✔
1601
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
3✔
1602
  }
1603

1604
  return 0;
12,127✔
1605
}
1606

1607
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
12,130✔
1608
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1609
  *pTscObj = NULL;
12,130✔
1610
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
12,130✔
1611
  if (TSDB_CODE_SUCCESS != code) {
12,130!
1612
    return code;
×
1613
  }
1614

1615
  SRequestObj* pRequest = NULL;
12,130✔
1616
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
12,130✔
1617
  if (TSDB_CODE_SUCCESS != code) {
12,130!
1618
    destroyTscObj(*pTscObj);
×
1619
    return code;
×
1620
  }
1621

1622
  pRequest->sqlstr = taosStrdup("taos_connect");
12,130✔
1623
  if (pRequest->sqlstr) {
12,130!
1624
    pRequest->sqlLen = strlen(pRequest->sqlstr);
12,130✔
1625
  } else {
1626
    return terrno;
×
1627
  }
1628

1629
  SMsgSendInfo* body = NULL;
12,130✔
1630
  code = buildConnectMsg(pRequest, &body);
12,130✔
1631
  if (TSDB_CODE_SUCCESS != code) {
12,130!
1632
    destroyTscObj(*pTscObj);
×
1633
    return code;
×
1634
  }
1635

1636
  // int64_t transporterId = 0;
1637
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
12,130✔
1638
  if (TSDB_CODE_SUCCESS != code) {
12,130!
1639
    destroyTscObj(*pTscObj);
×
1640
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1641
    return code;
×
1642
  }
1643
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
12,130!
1644
    destroyTscObj(*pTscObj);
×
1645
    tscError("failed to wait sem, code:%s", terrstr());
×
1646
    return terrno;
×
1647
  }
1648
  if (pRequest->code != TSDB_CODE_SUCCESS) {
12,130✔
1649
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
123!
1650
    tscError("failed to connect to server, reason: %s", errorMsg);
123!
1651

1652
    terrno = pRequest->code;
123✔
1653
    destroyRequest(pRequest);
123✔
1654
    taos_close_internal(*pTscObj);
123✔
1655
    *pTscObj = NULL;
123✔
1656
    return terrno;
123✔
1657
  } else {
1658
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p,QID:0x%" PRIx64, (*pTscObj)->id,
12,007✔
1659
             (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1660
    destroyRequest(pRequest);
12,007✔
1661
  }
1662
  return code;
12,007✔
1663
}
1664

1665
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
12,130✔
1666
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
12,130✔
1667
  if (*pMsgSendInfo == NULL) {
12,130!
1668
    return terrno;
×
1669
  }
1670

1671
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
12,130✔
1672

1673
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
12,130✔
1674
  (*pMsgSendInfo)->requestId = pRequest->requestId;
12,130✔
1675
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
12,130✔
1676
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
12,130✔
1677
  if (NULL == (*pMsgSendInfo)->param) {
12,130!
1678
    taosMemoryFree(*pMsgSendInfo);
×
1679
    return terrno;
×
1680
  }
1681

1682
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
12,130✔
1683

1684
  SConnectReq connectReq = {0};
12,130✔
1685
  STscObj*    pObj = pRequest->pTscObj;
12,130✔
1686

1687
  char* db = getDbOfConnection(pObj);
12,130✔
1688
  if (db != NULL) {
12,130✔
1689
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
30✔
1690
  } else if (terrno) {
12,100!
1691
    taosMemoryFree(*pMsgSendInfo);
×
1692
    return terrno;
×
1693
  }
1694
  taosMemoryFreeClear(db);
12,130✔
1695

1696
  connectReq.connType = pObj->connType;
12,130✔
1697
  connectReq.pid = appInfo.pid;
12,130✔
1698
  connectReq.startTime = appInfo.startTime;
12,130✔
1699

1700
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
12,130✔
1701
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
12,130✔
1702
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
12,130✔
1703
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
12,130✔
1704

1705
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
12,130✔
1706
  void*   pReq = taosMemoryMalloc(contLen);
12,130✔
1707
  if (NULL == pReq) {
12,130!
1708
    taosMemoryFree(*pMsgSendInfo);
×
1709
    return terrno;
×
1710
  }
1711

1712
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
12,130!
1713
    taosMemoryFree(*pMsgSendInfo);
×
1714
    taosMemoryFree(pReq);
×
1715
    return terrno;
×
1716
  }
1717

1718
  (*pMsgSendInfo)->msgInfo.len = contLen;
12,130✔
1719
  (*pMsgSendInfo)->msgInfo.pData = pReq;
12,130✔
1720
  return TSDB_CODE_SUCCESS;
12,130✔
1721
}
1722

1723
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
13,968,888✔
1724
  if (NULL == pEpSet) {
13,968,888✔
1725
    return;
12,499,489✔
1726
  }
1727

1728
  switch (pSendInfo->target.type) {
1,469,399✔
1729
    case TARGET_TYPE_MNODE:
24✔
1730
      if (NULL == pTscObj) {
24!
1731
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1732
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1733
        return;
×
1734
      }
1735

1736
      SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
24✔
1737
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
24✔
1738
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
24✔
1739
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
24✔
1740
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1741
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
24✔
1742
      break;
24✔
1743
    case TARGET_TYPE_VNODE: {
850,727✔
1744
      if (NULL == pTscObj) {
850,727!
1745
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1746
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1747
        return;
×
1748
      }
1749

1750
      SCatalog* pCatalog = NULL;
850,727✔
1751
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
850,727✔
1752
      if (code != TSDB_CODE_SUCCESS) {
850,730!
1753
        tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
×
1754
                 tstrerror(code));
1755
        return;
×
1756
      }
1757

1758
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
850,730✔
1759
      if (code != TSDB_CODE_SUCCESS) {
850,727!
1760
        tscError("fail to update catalog vg epset, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
×
1761
                 tstrerror(code));
1762
        return;
×
1763
      }
1764
      taosMemoryFreeClear(pSendInfo->target.dbFName);
850,728!
1765
      break;
850,728✔
1766
    }
1767
    default:
618,648✔
1768
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
618,648!
1769
      break;
619,505✔
1770
  }
1771
}
1772

1773
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
13,975,994✔
1774
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
13,975,994✔
1775
  if (pMsg->info.ahandle == NULL) {
13,975,994✔
1776
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
267!
1777
    rpcFreeCont(pMsg->pCont);
267✔
1778
    taosMemoryFree(pEpSet);
267✔
1779
    return TSDB_CODE_TSC_INTERNAL_ERROR;
267✔
1780
  }
1781

1782
  STscObj* pTscObj = NULL;
13,975,727✔
1783

1784
  STraceId* trace = &pMsg->info.traceId;
13,975,727✔
1785
  char      tbuf[40] = {0};
13,975,727✔
1786
  TRACE_TO_STR(trace, tbuf);
13,975,727!
1787

1788
  tscDebug("processMsgFromServer handle %p, message: %s, size:%d, code: %s,QID:%s", pMsg->info.handle,
13,971,613!
1789
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code), tbuf);
1790

1791
  if (pSendInfo->requestObjRefId != 0) {
13,971,614✔
1792
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
12,563,288✔
1793
    if (pRequest) {
12,559,786!
1794
      if (pRequest->self != pSendInfo->requestObjRefId) {
12,560,704!
1795
        tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64,
×
1796
                 pRequest->self, pSendInfo->requestObjRefId);
1797

1798
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1799
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1800
        }
1801
        rpcFreeCont(pMsg->pCont);
×
1802
        taosMemoryFree(pEpSet);
×
1803
        destroySendMsgInfo(pSendInfo);
×
1804
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1805
      }
1806
      pTscObj = pRequest->pTscObj;
12,560,704✔
1807
    }
1808
  }
1809

1810
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
13,968,112✔
1811

1812
  SDataBuf buf = {.msgType = pMsg->msgType,
13,969,016✔
1813
                  .len = pMsg->contLen,
13,969,016✔
1814
                  .pData = NULL,
1815
                  .handle = pMsg->info.handle,
13,969,016✔
1816
                  .handleRefId = pMsg->info.refId,
13,969,016✔
1817
                  .pEpSet = pEpSet};
1818

1819
  if (pMsg->contLen > 0) {
13,969,016✔
1820
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
13,860,035✔
1821
    if (buf.pData == NULL) {
13,861,291✔
1822
      terrno = TSDB_CODE_OUT_OF_MEMORY;
894✔
1823
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
1824
    } else {
1825
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
13,860,397✔
1826
    }
1827
  }
1828

1829
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
13,969,378✔
1830

1831
  if (pTscObj) {
13,957,196✔
1832
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
12,547,692✔
1833
    if (TSDB_CODE_SUCCESS != code) {
12,557,209!
1834
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1835
      terrno = code;
×
1836
      pMsg->code = code;
×
1837
    }
1838
  }
1839

1840
  rpcFreeCont(pMsg->pCont);
13,966,713✔
1841
  destroySendMsgInfo(pSendInfo);
13,974,456✔
1842
  return TSDB_CODE_SUCCESS;
13,975,815✔
1843
}
1844
int32_t doProcessMsgFromServer(void* param) {
13,978,115✔
1845
  AsyncArg* arg = (AsyncArg*)param;
13,978,115✔
1846
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
13,978,115✔
1847
  taosMemoryFree(arg);
13,972,081✔
1848
  return code;
13,976,488✔
1849
}
1850

1851
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
13,949,770✔
1852
  int32_t code = 0;
13,949,770✔
1853
  SEpSet* tEpSet = NULL;
13,949,770✔
1854
  if (pEpSet != NULL) {
13,949,770✔
1855
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
1,470,266✔
1856
    if (NULL == tEpSet) {
1,470,265!
1857
      code = terrno;
×
1858
      pMsg->code = terrno;
×
1859
      goto _exit;
×
1860
    }
1861
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
1,470,265✔
1862
  }
1863

1864
  // pMsg is response msg
1865
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
13,949,769✔
1866
    // restore origin code
1867
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
12,130!
1868
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1869
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
12,130!
1870
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1871
    }
1872
  } else {
1873
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1874
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
13,937,639!
1875
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1876
    }
1877
  }
1878

1879
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
13,949,769✔
1880
  if (NULL == arg) {
13,951,405!
1881
    code = terrno;
×
1882
    pMsg->code = code;
×
1883
    goto _exit;
×
1884
  }
1885

1886
  arg->msg = *pMsg;
13,951,405✔
1887
  arg->pEpset = tEpSet;
13,951,405✔
1888

1889
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
13,951,405!
1890
    pMsg->code = code;
×
1891
    taosMemoryFree(arg);
×
1892
    goto _exit;
×
1893
  }
1894
  return;
13,970,921✔
1895
_exit:
×
1896
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
1897
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
1898
  if (code != 0) {
×
1899
    tscError("failed to sched msg to tsc, tsc ready quit");
×
1900
  }
1901
}
1902

1903
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
2✔
1904
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
2!
1905
  if (user == NULL) {
2!
1906
    user = TSDB_DEFAULT_USER;
×
1907
  }
1908

1909
  if (auth == NULL) {
2!
1910
    tscError("No auth info is given, failed to connect to server");
×
1911
    return NULL;
×
1912
  }
1913

1914
  STscObj* pObj = NULL;
2✔
1915
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
2✔
1916
  if (TSDB_CODE_SUCCESS == code) {
2✔
1917
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1✔
1918
    if (NULL == rid) {
1!
1919
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
1920
    }
1921
    *rid = pObj->id;
1✔
1922
    return (TAOS*)rid;
1✔
1923
  }
1924

1925
  return NULL;
1✔
1926
}
1927

1928
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
1929
//                      const char* db, int dbLen, uint16_t port) {
1930
//   char ipStr[TSDB_EP_LEN] = {0};
1931
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
1932
//   char userStr[TSDB_USER_LEN] = {0};
1933
//   char passStr[TSDB_PASSWORD_LEN] = {0};
1934
//
1935
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
1936
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
1937
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
1938
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1939
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
1940
// }
1941

1942
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
20,863,517✔
1943
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
118,167,797✔
1944
    SResultColumn* pCol = &pResultInfo->pCol[i];
97,304,280✔
1945

1946
    int32_t type = pResultInfo->fields[i].type;
97,304,280✔
1947
    int32_t bytes = pResultInfo->fields[i].bytes;
97,304,280✔
1948

1949
    if (IS_VAR_DATA_TYPE(type)) {
97,304,280✔
1950
      if (!IS_VAR_NULL_TYPE(type, bytes) && pCol->offset[pResultInfo->current] != -1) {
21,923,210!
1951
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
21,260,313✔
1952

1953
        pResultInfo->length[i] = varDataLen(pStart);
21,260,313✔
1954
        pResultInfo->row[i] = varDataVal(pStart);
21,260,313✔
1955
      } else {
1956
        pResultInfo->row[i] = NULL;
662,897✔
1957
        pResultInfo->length[i] = 0;
662,897✔
1958
      }
1959
    } else {
1960
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
75,381,070✔
1961
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
72,852,269✔
1962
        pResultInfo->length[i] = bytes;
72,852,269✔
1963
      } else {
1964
        pResultInfo->row[i] = NULL;
2,528,801✔
1965
        pResultInfo->length[i] = 0;
2,528,801✔
1966
      }
1967
    }
1968
  }
1969
}
20,863,517✔
1970

1971
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
1972
  if (pRequest == NULL) {
×
1973
    return NULL;
×
1974
  }
1975

1976
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
1977
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
1978
    // All data has returned to App already, no need to try again
1979
    if (pResultInfo->completed) {
×
1980
      pResultInfo->numOfRows = 0;
×
1981
      return NULL;
×
1982
    }
1983

1984
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
1985
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
1986

1987
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
1988
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
1989
      pResultInfo->numOfRows = 0;
×
1990
      return NULL;
×
1991
    }
1992

1993
    pRequest->code =
×
1994
        setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
×
1995
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
1996
      pResultInfo->numOfRows = 0;
×
1997
      return NULL;
×
1998
    }
1999

2000
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
×
2001
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2002

2003
    STscObj*            pTscObj = pRequest->pTscObj;
×
2004
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2005
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2006

2007
    if (pResultInfo->numOfRows == 0) {
×
2008
      return NULL;
×
2009
    }
2010
  }
2011

2012
  if (setupOneRowPtr) {
×
2013
    doSetOneRowPtr(pResultInfo);
×
2014
    pResultInfo->current += 1;
×
2015
  }
2016

2017
  return pResultInfo->row;
×
2018
}
2019

2020
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
882,722✔
2021
  tsem_t* sem = param;
882,722✔
2022
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
882,722!
2023
    tscError("failed to post sem, code:%s", terrstr());
×
2024
  }
2025
}
882,721✔
2026

2027
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
7,132,351✔
2028
  if (pRequest == NULL) {
7,132,351!
2029
    return NULL;
×
2030
  }
2031

2032
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
7,132,351✔
2033
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
7,132,351✔
2034
    // All data has returned to App already, no need to try again
2035
    if (pResultInfo->completed) {
1,519,116✔
2036
      pResultInfo->numOfRows = 0;
636,393✔
2037
      return NULL;
636,393✔
2038
    }
2039

2040
    // convert ucs4 to native multi-bytes string
2041
    pResultInfo->convertUcs4 = convertUcs4;
882,723✔
2042
    tsem_t sem;
2043
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
882,723!
2044
      tscError("failed to init sem, code:%s", terrstr());
×
2045
    }
2046
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
882,722✔
2047
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
882,721!
2048
      tscError("failed to wait sem, code:%s", terrstr());
×
2049
    }
2050
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
882,717!
2051
      tscError("failed to destroy sem, code:%s", terrstr());
×
2052
    }
2053
    pRequest->inCallback = false;
882,720✔
2054
  }
2055

2056
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
6,495,955!
2057
    return NULL;
59,975✔
2058
  } else {
2059
    if (setupOneRowPtr) {
6,435,980✔
2060
      doSetOneRowPtr(pResultInfo);
5,660,512✔
2061
      pResultInfo->current += 1;
5,660,507✔
2062
    }
2063

2064
    return pResultInfo->row;
6,435,975✔
2065
  }
2066
}
2067

2068
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
1,332,292✔
2069
  if (pResInfo->row == NULL) {
1,332,292✔
2070
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,165,533✔
2071
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
1,165,579✔
2072
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
1,165,581✔
2073
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,165,588✔
2074

2075
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
1,165,580!
2076
      taosMemoryFree(pResInfo->row);
5✔
2077
      taosMemoryFree(pResInfo->pCol);
×
2078
      taosMemoryFree(pResInfo->length);
×
2079
      taosMemoryFree(pResInfo->convertBuf);
×
2080
      return terrno;
×
2081
    }
2082
  }
2083

2084
  return TSDB_CODE_SUCCESS;
1,332,334✔
2085
}
2086

2087
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength) {
1,332,262✔
2088
  int32_t idx = -1;
1,332,262✔
2089
  iconv_t conv = taosAcquireConv(&idx, C2M);
1,332,262✔
2090
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
1,332,297!
2091

2092
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,383,450✔
2093
    int32_t type = pResultInfo->fields[i].type;
5,051,092✔
2094
    int32_t bytes = pResultInfo->fields[i].bytes;
5,051,092✔
2095

2096
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
5,051,092✔
2097
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
236,815✔
2098
      if (p == NULL) {
236,815!
2099
        taosReleaseConv(idx, conv, C2M);
×
2100
        return terrno;
×
2101
      }
2102

2103
      pResultInfo->convertBuf[i] = p;
236,815✔
2104

2105
      SResultColumn* pCol = &pResultInfo->pCol[i];
236,815✔
2106
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
27,629,885✔
2107
        if (pCol->offset[j] != -1) {
27,393,009✔
2108
          char* pStart = pCol->offset[j] + pCol->pData;
24,474,793✔
2109

2110
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
24,474,793✔
2111
          if (len < 0 || len > bytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
24,474,859!
2112
            tscError(
5!
2113
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2114
                "colLength[i]):%p",
2115
                len, bytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2116
            taosReleaseConv(idx, conv, C2M);
5✔
2117
            return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2118
          }
2119

2120
          varDataSetLen(p, len);
24,474,854✔
2121
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
24,474,854✔
2122
          p += (len + VARSTR_HEADER_SIZE);
24,474,854✔
2123
        }
2124
      }
2125

2126
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
236,876✔
2127
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
236,876✔
2128
    }
2129
  }
2130
  taosReleaseConv(idx, conv, C2M);
1,332,358✔
2131
  return TSDB_CODE_SUCCESS;
1,332,305✔
2132
}
2133

2134
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1,224✔
2135
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
1,224✔
2136
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
2137
}
2138

2139
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
612✔
2140
  char*   p = (char*)pResultInfo->pData;
612✔
2141
  int32_t blockVersion = *(int32_t*)p;
612✔
2142

2143
  int32_t numOfRows = pResultInfo->numOfRows;
612✔
2144
  int32_t numOfCols = pResultInfo->numOfCols;
612✔
2145

2146
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2147
  // length |
2148
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
612✔
2149
  if (numOfCols != cols) {
612!
2150
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2151
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2152
  }
2153

2154
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
612✔
2155
  int32_t* colLength = (int32_t*)(p + len);
612✔
2156
  len += sizeof(int32_t) * numOfCols;
612✔
2157

2158
  char* pStart = p + len;
612✔
2159
  for (int32_t i = 0; i < numOfCols; ++i) {
2,776✔
2160
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,164!
2161

2162
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,164✔
2163
      int32_t* offset = (int32_t*)pStart;
644✔
2164
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
644✔
2165
      len += lenTmp;
644✔
2166
      pStart += lenTmp;
644✔
2167

2168
      int32_t estimateColLen = 0;
644✔
2169
      for (int32_t j = 0; j < numOfRows; ++j) {
2,552✔
2170
        if (offset[j] == -1) {
1,908✔
2171
          continue;
212✔
2172
        }
2173
        char* data = offset[j] + pStart;
1,696✔
2174

2175
        int32_t jsonInnerType = *data;
1,696✔
2176
        char*   jsonInnerData = data + CHAR_BYTES;
1,696✔
2177
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1,696✔
2178
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
72✔
2179
        } else if (tTagIsJson(data)) {
1,624✔
2180
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
836✔
2181
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
788✔
2182
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
492✔
2183
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2184
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
216✔
2185
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2186
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
80✔
2187
        } else {
2188
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2189
          return -1;
×
2190
        }
2191
      }
2192
      len += TMAX(colLen, estimateColLen);
644✔
2193
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,520!
2194
      int32_t lenTmp = numOfRows * sizeof(int32_t);
396✔
2195
      len += (lenTmp + colLen);
396✔
2196
      pStart += lenTmp;
396✔
2197
    } else {
2198
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
1,124✔
2199
      len += (lenTmp + colLen);
1,124✔
2200
      pStart += lenTmp;
1,124✔
2201
    }
2202
    pStart += colLen;
2,164✔
2203
  }
2204

2205
  // Ensure the complete structure of the block, including the blankfill field,
2206
  // even though it is not used on the client side.
2207
  len += sizeof(bool);
612✔
2208
  return len;
612✔
2209
}
2210

2211
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
1,332,338✔
2212
  int32_t numOfRows = pResultInfo->numOfRows;
1,332,338✔
2213
  int32_t numOfCols = pResultInfo->numOfCols;
1,332,338✔
2214
  bool needConvert = false;
1,332,338✔
2215
  for (int32_t i = 0; i < numOfCols; ++i) {
6,382,988✔
2216
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
5,051,262✔
2217
      needConvert = true;
612✔
2218
      break;
612✔
2219
    }
2220
  }
2221

2222
  if (!needConvert) {
1,332,338✔
2223
    return TSDB_CODE_SUCCESS;
1,331,723✔
2224
  }
2225

2226
  tscDebug("start to convert form json format string");
615✔
2227

2228
  char*   p = (char*)pResultInfo->pData;
615✔
2229
  int32_t blockVersion = *(int32_t*)p;
615✔
2230
  int32_t dataLen = estimateJsonLen(pResultInfo);
615✔
2231
  if (dataLen <= 0) {
612!
2232
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2233
  }
2234

2235
  taosMemoryFreeClear(pResultInfo->convertJson);
612✔
2236
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
612✔
2237
  if (pResultInfo->convertJson == NULL) return terrno;
612!
2238
  char* p1 = pResultInfo->convertJson;
612✔
2239

2240
  int32_t totalLen = 0;
612✔
2241
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
612✔
2242
  if (numOfCols != cols) {
612!
2243
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2244
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2245
  }
2246

2247
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
612✔
2248
  (void)memcpy(p1, p, len);
612✔
2249

2250
  p += len;
612✔
2251
  p1 += len;
612✔
2252
  totalLen += len;
612✔
2253

2254
  len = sizeof(int32_t) * numOfCols;
612✔
2255
  int32_t* colLength = (int32_t*)p;
612✔
2256
  int32_t* colLength1 = (int32_t*)p1;
612✔
2257
  (void)memcpy(p1, p, len);
612✔
2258
  p += len;
612✔
2259
  p1 += len;
612✔
2260
  totalLen += len;
612✔
2261

2262
  char* pStart = p;
612✔
2263
  char* pStart1 = p1;
612✔
2264
  for (int32_t i = 0; i < numOfCols; ++i) {
2,776✔
2265
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,164!
2266
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
2,164!
2267
    if (colLen >= dataLen) {
2,164!
2268
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2269
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2270
    }
2271
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,164✔
2272
      int32_t* offset = (int32_t*)pStart;
644✔
2273
      int32_t* offset1 = (int32_t*)pStart1;
644✔
2274
      len = numOfRows * sizeof(int32_t);
644✔
2275
      (void)memcpy(pStart1, pStart, len);
644✔
2276
      pStart += len;
644✔
2277
      pStart1 += len;
644✔
2278
      totalLen += len;
644✔
2279

2280
      len = 0;
644✔
2281
      for (int32_t j = 0; j < numOfRows; ++j) {
2,552✔
2282
        if (offset[j] == -1) {
1,908✔
2283
          continue;
212✔
2284
        }
2285
        char* data = offset[j] + pStart;
1,696✔
2286

2287
        int32_t jsonInnerType = *data;
1,696✔
2288
        char*   jsonInnerData = data + CHAR_BYTES;
1,696✔
2289
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
1,696✔
2290
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1,696✔
2291
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
72✔
2292
          varDataSetLen(dst, strlen(varDataVal(dst)));
72✔
2293
        } else if (tTagIsJson(data)) {
1,624✔
2294
          char* jsonString = NULL;
836✔
2295
          parseTagDatatoJson(data, &jsonString);
836✔
2296
          if (jsonString == NULL) {
836!
2297
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2298
            return terrno;
×
2299
          }
2300
          STR_TO_VARSTR(dst, jsonString);
836✔
2301
          taosMemoryFree(jsonString);
836✔
2302
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
788✔
2303
          *(char*)varDataVal(dst) = '\"';
492✔
2304
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
492✔
2305
                                         varDataVal(dst) + CHAR_BYTES);
2306
          if (length <= 0) {
492✔
2307
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset);
4!
2308
            length = 0;
4✔
2309
          }
2310
          varDataSetLen(dst, length + CHAR_BYTES * 2);
492✔
2311
          *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
492✔
2312
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2313
          double jsonVd = *(double*)(jsonInnerData);
216✔
2314
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
216✔
2315
          varDataSetLen(dst, strlen(varDataVal(dst)));
216✔
2316
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2317
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
80✔
2318
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
80✔
2319
          varDataSetLen(dst, strlen(varDataVal(dst)));
80✔
2320
        } else {
2321
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2322
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2323
        }
2324

2325
        offset1[j] = len;
1,696✔
2326
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
1,696✔
2327
        len += varDataTLen(dst);
1,696✔
2328
      }
2329
      colLen1 = len;
644✔
2330
      totalLen += colLen1;
644✔
2331
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
644!
2332
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,520!
2333
      len = numOfRows * sizeof(int32_t);
396✔
2334
      (void)memcpy(pStart1, pStart, len);
396✔
2335
      pStart += len;
396✔
2336
      pStart1 += len;
396✔
2337
      totalLen += len;
396✔
2338
      totalLen += colLen;
396✔
2339
      (void)memcpy(pStart1, pStart, colLen);
396✔
2340
    } else {
2341
      len = BitmapLen(pResultInfo->numOfRows);
1,124✔
2342
      (void)memcpy(pStart1, pStart, len);
1,124✔
2343
      pStart += len;
1,124✔
2344
      pStart1 += len;
1,124✔
2345
      totalLen += len;
1,124✔
2346
      totalLen += colLen;
1,124✔
2347
      (void)memcpy(pStart1, pStart, colLen);
1,124✔
2348
    }
2349
    pStart += colLen;
2,164✔
2350
    pStart1 += colLen1;
2,164✔
2351
  }
2352

2353
  // Ensure the complete structure of the block, including the blankfill field,
2354
  // even though it is not used on the client side.
2355
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2356
  totalLen += sizeof(bool);
612✔
2357

2358
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
612✔
2359
  pResultInfo->pData = pResultInfo->convertJson;
612✔
2360
  return TSDB_CODE_SUCCESS;
612✔
2361
}
2362

2363
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
1,397,304✔
2364
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
1,397,304!
2365
    tscError("setResultDataPtr paras error");
×
2366
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2367
  }
2368

2369
  if (pResultInfo->numOfRows == 0) {
1,397,310✔
2370
    return TSDB_CODE_SUCCESS;
65,011✔
2371
  }
2372

2373
  if (pResultInfo->pData == NULL) {
1,332,299!
2374
    tscError("setResultDataPtr error: pData is NULL");
×
2375
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2376
  }
2377

2378
  int32_t code = doPrepareResPtr(pResultInfo);
1,332,299✔
2379
  if (code != TSDB_CODE_SUCCESS) {
1,332,333!
2380
    return code;
×
2381
  }
2382
  code = doConvertJson(pResultInfo);
1,332,333✔
2383
  if (code != TSDB_CODE_SUCCESS) {
1,332,329!
2384
    return code;
×
2385
  }
2386

2387
  char* p = (char*)pResultInfo->pData;
1,332,329✔
2388

2389
  // version:
2390
  int32_t blockVersion = *(int32_t*)p;
1,332,329✔
2391
  p += sizeof(int32_t);
1,332,329✔
2392

2393
  int32_t dataLen = *(int32_t*)p;
1,332,329✔
2394
  p += sizeof(int32_t);
1,332,329✔
2395

2396
  int32_t rows = *(int32_t*)p;
1,332,329✔
2397
  p += sizeof(int32_t);
1,332,329✔
2398

2399
  int32_t cols = *(int32_t*)p;
1,332,329✔
2400
  p += sizeof(int32_t);
1,332,329✔
2401

2402
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
1,332,329!
2403
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows, pResultInfo->numOfRows, cols,
×
2404
             pResultInfo->numOfCols);
2405
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2406
  }
2407

2408
  int32_t hasColumnSeg = *(int32_t*)p;
1,332,329✔
2409
  p += sizeof(int32_t);
1,332,329✔
2410

2411
  uint64_t groupId = *(uint64_t*)p;
1,332,329✔
2412
  p += sizeof(uint64_t);
1,332,329✔
2413

2414
  // check fields
2415
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,383,636✔
2416
    int8_t type = *(int8_t*)p;
5,051,307✔
2417
    p += sizeof(int8_t);
5,051,307✔
2418

2419
    int32_t bytes = *(int32_t*)p;
5,051,307✔
2420
    p += sizeof(int32_t);
5,051,307✔
2421
  }
2422

2423
  int32_t* colLength = (int32_t*)p;
1,332,329✔
2424
  p += sizeof(int32_t) * pResultInfo->numOfCols;
1,332,329✔
2425

2426
  char* pStart = p;
1,332,329✔
2427
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,383,243✔
2428
    if ((pStart - pResultInfo->pData) >= dataLen) {
5,050,939!
2429
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2430
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2431
    }
2432
    if (blockVersion == BLOCK_VERSION_1) {
5,050,939✔
2433
      colLength[i] = htonl(colLength[i]);
3,147,505✔
2434
    }
2435
    if (colLength[i] >= dataLen) {
5,050,939!
2436
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2437
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2438
    }
2439
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
5,050,939✔
2440
      tscError("invalid type %d", pResultInfo->fields[i].type);
25!
2441
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2442
    }
2443
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
5,050,914✔
2444
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
1,261,724✔
2445
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
1,261,724✔
2446
    } else {
2447
      pResultInfo->pCol[i].nullbitmap = pStart;
3,789,190✔
2448
      pStart += BitmapLen(pResultInfo->numOfRows);
3,789,190✔
2449
    }
2450

2451
    pResultInfo->pCol[i].pData = pStart;
5,050,914✔
2452
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
5,050,914✔
2453
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
5,050,914✔
2454

2455
    pStart += colLength[i];
5,050,914✔
2456
  }
2457

2458
  p = pStart;
1,332,304✔
2459
  // bool blankFill = *(bool*)p;
2460
  p += sizeof(bool);
1,332,304✔
2461
  int32_t offset = p - pResultInfo->pData;
1,332,304✔
2462
  if (offset > dataLen) {
1,332,304!
2463
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2464
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2465
  }
2466

2467
  if (convertUcs4) {
1,332,304✔
2468
    code = doConvertUCS4(pResultInfo, colLength);
1,332,261✔
2469
  }
2470

2471
  return code;
1,332,333✔
2472
}
2473

2474
char* getDbOfConnection(STscObj* pObj) {
10,867,344✔
2475
  terrno = TSDB_CODE_SUCCESS;
10,867,344✔
2476
  char* p = NULL;
10,864,706✔
2477
  (void)taosThreadMutexLock(&pObj->mutex);
10,864,706✔
2478
  size_t len = strlen(pObj->db);
10,874,996✔
2479
  if (len > 0) {
10,874,996✔
2480
    p = taosStrndup(pObj->db, tListLen(pObj->db));
10,562,817✔
2481
    if (p == NULL) {
10,556,387!
2482
      tscError("failed to taosStrndup db name");
×
2483
    }
2484
  }
2485

2486
  (void)taosThreadMutexUnlock(&pObj->mutex);
10,868,566✔
2487
  return p;
10,876,140✔
2488
}
2489

2490
void setConnectionDB(STscObj* pTscObj, const char* db) {
9,407✔
2491
  if (db == NULL || pTscObj == NULL) {
9,407!
2492
    tscError("setConnectionDB para is NULL");
×
2493
    return;
×
2494
  }
2495

2496
  (void)taosThreadMutexLock(&pTscObj->mutex);
9,407✔
2497
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
9,408✔
2498
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
9,408✔
2499
}
2500

2501
void resetConnectDB(STscObj* pTscObj) {
×
2502
  if (pTscObj == NULL) {
×
2503
    return;
×
2504
  }
2505

2506
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2507
  pTscObj->db[0] = 0;
×
2508
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2509
}
2510

2511
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
1,041,785✔
2512
  if (pResultInfo == NULL || pRsp == NULL) {
1,041,785!
2513
    tscError("setQueryResultFromRsp paras is null");
×
2514
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2515
  }
2516

2517
  taosMemoryFreeClear(pResultInfo->pRspMsg);
1,041,791✔
2518
  pResultInfo->pRspMsg = (const char*)pRsp;
1,041,791✔
2519
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
1,041,791✔
2520
  pResultInfo->current = 0;
1,041,796✔
2521
  pResultInfo->completed = (pRsp->completed == 1);
1,041,796✔
2522
  pResultInfo->precision = pRsp->precision;
1,041,796✔
2523

2524
  // decompress data if needed
2525
  int32_t payloadLen = htonl(pRsp->payloadLen);
1,041,796✔
2526

2527
  if (pRsp->compressed) {
1,041,796✔
2528
    if (pResultInfo->decompBuf == NULL) {
650✔
2529
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
14✔
2530
      if (pResultInfo->decompBuf == NULL) {
14!
2531
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2532
        return terrno;
×
2533
      }
2534
      pResultInfo->decompBufSize = payloadLen;
14✔
2535
    } else {
2536
      if (pResultInfo->decompBufSize < payloadLen) {
636✔
2537
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
17✔
2538
        if (p == NULL) {
17!
2539
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2540
          return terrno;
×
2541
        }
2542

2543
        pResultInfo->decompBuf = p;
17✔
2544
        pResultInfo->decompBufSize = payloadLen;
17✔
2545
      }
2546
    }
2547
  }
2548

2549
  if (payloadLen > 0) {
1,041,796✔
2550
    int32_t compLen = *(int32_t*)pRsp->data;
976,781✔
2551
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
976,781✔
2552

2553
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
976,781✔
2554

2555
    if (pRsp->compressed && compLen < rawLen) {
976,781!
2556
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
650✔
2557
      if (len < 0) {
650!
2558
        tscError("tsDecompressString failed");
×
2559
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2560
      }
2561
      if (len != rawLen) {
650!
2562
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2563
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2564
      }
2565
      pResultInfo->pData = pResultInfo->decompBuf;
650✔
2566
      pResultInfo->payloadLen = rawLen;
650✔
2567
    } else {
2568
      pResultInfo->pData = pStart;
976,131✔
2569
      pResultInfo->payloadLen = htonl(pRsp->compLen);
976,131✔
2570
      if (pRsp->compLen != pRsp->payloadLen) {
976,131!
2571
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2572
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2573
      }
2574
    }
2575
  }
2576

2577
  // TODO handle the compressed case
2578
  pResultInfo->totalRows += pResultInfo->numOfRows;
1,041,796✔
2579

2580
  int32_t code =
2581
      setResultDataPtr(pResultInfo, convertUcs4);
1,041,796✔
2582
  return code;
1,041,801✔
2583
}
2584

2585
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
3✔
2586
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
3✔
2587
  void*              clientRpc = NULL;
3✔
2588
  SServerStatusRsp   statusRsp = {0};
3✔
2589
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
3✔
2590
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
3✔
2591
  SRpcMsg  rpcRsp = {0};
3✔
2592
  SRpcInit rpcInit = {0};
3✔
2593
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
3✔
2594

2595
  rpcInit.label = "CHK";
3✔
2596
  rpcInit.numOfThreads = 1;
3✔
2597
  rpcInit.cfp = NULL;
3✔
2598
  rpcInit.sessions = 16;
3✔
2599
  rpcInit.connType = TAOS_CONN_CLIENT;
3✔
2600
  rpcInit.idleTime = tsShellActivityTimer * 1000;
3✔
2601
  rpcInit.compressSize = tsCompressMsgSize;
3✔
2602
  rpcInit.user = "_dnd";
3✔
2603

2604
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
3✔
2605
  connLimitNum = TMAX(connLimitNum, 10);
3✔
2606
  connLimitNum = TMIN(connLimitNum, 500);
3✔
2607
  rpcInit.connLimitNum = connLimitNum;
3✔
2608
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
3✔
2609
  rpcInit.readTimeout = tsReadTimeout;
3✔
2610
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
3!
2611
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2612
    goto _OVER;
×
2613
  }
2614

2615
  clientRpc = rpcOpen(&rpcInit);
3✔
2616
  if (clientRpc == NULL) {
3!
2617
    code = terrno;
×
2618
    tscError("failed to init server status client since %s", tstrerror(code));
×
2619
    goto _OVER;
×
2620
  }
2621

2622
  if (fqdn == NULL) {
3!
2623
    fqdn = tsLocalFqdn;
3✔
2624
  }
2625

2626
  if (port == 0) {
3!
2627
    port = tsServerPort;
3✔
2628
  }
2629

2630
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
3✔
2631
  epSet.eps[0].port = (uint16_t)port;
3✔
2632
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
3✔
2633
  if (TSDB_CODE_SUCCESS != ret) {
3!
2634
    tscError("failed to send recv since %s", tstrerror(ret));
×
2635
    goto _OVER;
×
2636
  }
2637

2638
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
3!
2639
    tscError("failed to send server status req since %s", terrstr());
1!
2640
    goto _OVER;
1✔
2641
  }
2642

2643
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
2!
2644
    tscError("failed to parse server status rsp since %s", terrstr());
×
2645
    goto _OVER;
×
2646
  }
2647

2648
  code = statusRsp.statusCode;
2✔
2649
  if (details != NULL) {
2!
2650
    tstrncpy(details, statusRsp.details, maxlen);
2✔
2651
  }
2652

2653
_OVER:
×
2654
  if (clientRpc != NULL) {
3!
2655
    rpcClose(clientRpc);
3✔
2656
  }
2657
  if (rpcRsp.pCont != NULL) {
3✔
2658
    rpcFreeCont(rpcRsp.pCont);
2✔
2659
  }
2660
  return code;
3✔
2661
}
2662

2663
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
3✔
2664
                      int32_t acctId, char* db) {
2665
  SName name = {0};
3✔
2666

2667
  if (len1 <= 0) {
3!
2668
    return -1;
×
2669
  }
2670

2671
  const char* dbName = db;
3✔
2672
  const char* tbName = NULL;
3✔
2673
  int32_t     dbLen = 0;
3✔
2674
  int32_t     tbLen = 0;
3✔
2675
  if (len2 > 0) {
3!
2676
    dbName = str + pos1;
×
2677
    dbLen = len1;
×
2678
    tbName = str + pos2;
×
2679
    tbLen = len2;
×
2680
  } else {
2681
    dbLen = strlen(db);
3✔
2682
    tbName = str + pos1;
3✔
2683
    tbLen = len1;
3✔
2684
  }
2685

2686
  if (dbLen <= 0 || tbLen <= 0) {
3!
2687
    return -1;
×
2688
  }
2689

2690
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
3!
2691
    return -1;
×
2692
  }
2693

2694
  if (tNameAddTbName(&name, tbName, tbLen)) {
3!
2695
    return -1;
×
2696
  }
2697

2698
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
3✔
2699
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
3✔
2700

2701
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
3✔
2702
  if (pDb) {
3!
2703
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2704
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2705
    }
2706
  } else {
2707
    STablesReq db;
2708
    db.pTables = taosArrayInit(20, sizeof(SName));
3✔
2709
    if (NULL == db.pTables) {
3!
2710
      return terrno;
×
2711
    }
2712
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
3✔
2713
    if (NULL == taosArrayPush(db.pTables, &name)) {
6!
2714
      return terrno;
×
2715
    }
2716
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
3!
2717
  }
2718

2719
  return TSDB_CODE_SUCCESS;
3✔
2720
}
2721

2722
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
3✔
2723
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
3✔
2724
  if (NULL == pHash) {
3!
2725
    return terrno;
×
2726
  }
2727

2728
  bool    inEscape = false;
3✔
2729
  int32_t code = 0;
3✔
2730
  void*   pIter = NULL;
3✔
2731

2732
  int32_t vIdx = 0;
3✔
2733
  int32_t vPos[2];
2734
  int32_t vLen[2];
2735

2736
  (void)memset(vPos, -1, sizeof(vPos));
3✔
2737
  (void)memset(vLen, 0, sizeof(vLen));
3✔
2738

2739
  for (int32_t i = 0;; ++i) {
12✔
2740
    if (0 == *(tbList + i)) {
12✔
2741
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
3!
2742
        vLen[vIdx] = i - vPos[vIdx];
3✔
2743
      }
2744

2745
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
3✔
2746
      if (code) {
3!
2747
        goto _return;
×
2748
      }
2749

2750
      break;
3✔
2751
    }
2752

2753
    if ('`' == *(tbList + i)) {
9!
2754
      inEscape = !inEscape;
×
2755
      if (!inEscape) {
×
2756
        if (vPos[vIdx] >= 0) {
×
2757
          vLen[vIdx] = i - vPos[vIdx];
×
2758
        } else {
2759
          goto _return;
×
2760
        }
2761
      }
2762

2763
      continue;
×
2764
    }
2765

2766
    if (inEscape) {
9!
2767
      if (vPos[vIdx] < 0) {
×
2768
        vPos[vIdx] = i;
×
2769
      }
2770
      continue;
×
2771
    }
2772

2773
    if ('.' == *(tbList + i)) {
9!
2774
      if (vPos[vIdx] < 0) {
×
2775
        goto _return;
×
2776
      }
2777
      if (vLen[vIdx] <= 0) {
×
2778
        vLen[vIdx] = i - vPos[vIdx];
×
2779
      }
2780
      vIdx++;
×
2781
      if (vIdx >= 2) {
×
2782
        goto _return;
×
2783
      }
2784
      continue;
×
2785
    }
2786

2787
    if (',' == *(tbList + i)) {
9!
2788
      if (vPos[vIdx] < 0) {
×
2789
        goto _return;
×
2790
      }
2791
      if (vLen[vIdx] <= 0) {
×
2792
        vLen[vIdx] = i - vPos[vIdx];
×
2793
      }
2794

2795
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
2796
      if (code) {
×
2797
        goto _return;
×
2798
      }
2799

2800
      (void)memset(vPos, -1, sizeof(vPos));
×
2801
      (void)memset(vLen, 0, sizeof(vLen));
×
2802
      vIdx = 0;
×
2803
      continue;
×
2804
    }
2805

2806
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
9!
2807
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
2808
        vLen[vIdx] = i - vPos[vIdx];
×
2809
      }
2810
      continue;
×
2811
    }
2812

2813
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
9!
2814
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
×
2815
      if (vLen[vIdx] > 0) {
9!
2816
        goto _return;
×
2817
      }
2818
      if (vPos[vIdx] < 0) {
9✔
2819
        vPos[vIdx] = i;
3✔
2820
      }
2821
      continue;
9✔
2822
    }
2823

2824
    goto _return;
×
2825
  }
2826

2827
  int32_t dbNum = taosHashGetSize(pHash);
3✔
2828
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
3✔
2829
  if (NULL == pReq) {
3!
2830
    TSC_ERR_JRET(terrno);
×
2831
  }
2832
  pIter = taosHashIterate(pHash, NULL);
3✔
2833
  while (pIter) {
6✔
2834
    STablesReq* pDb = (STablesReq*)pIter;
3✔
2835
    if (NULL == taosArrayPush(*pReq, pDb)) {
6!
2836
      TSC_ERR_JRET(terrno);
×
2837
    }
2838
    pIter = taosHashIterate(pHash, pIter);
3✔
2839
  }
2840

2841
  taosHashCleanup(pHash);
3✔
2842

2843
  return TSDB_CODE_SUCCESS;
3✔
2844

2845
_return:
×
2846

2847
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
2848

2849
  pIter = taosHashIterate(pHash, NULL);
×
2850
  while (pIter) {
×
2851
    STablesReq* pDb = (STablesReq*)pIter;
×
2852
    taosArrayDestroy(pDb->pTables);
×
2853
    pIter = taosHashIterate(pHash, pIter);
×
2854
  }
2855

2856
  taosHashCleanup(pHash);
×
2857

2858
  return terrno;
×
2859
}
2860

2861
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
3✔
2862
  SSyncQueryParam* pParam = param;
3✔
2863
  pParam->pRequest->code = code;
3✔
2864

2865
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
3!
2866
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2867
  }
2868
}
3✔
2869

2870
void syncQueryFn(void* param, void* res, int32_t code) {
10,836,476✔
2871
  SSyncQueryParam* pParam = param;
10,836,476✔
2872
  pParam->pRequest = res;
10,836,476✔
2873

2874
  if (pParam->pRequest) {
10,836,476✔
2875
    pParam->pRequest->code = code;
10,835,548✔
2876
    clientOperateReport(pParam->pRequest);
10,835,548✔
2877
  }
2878

2879
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
10,838,736!
2880
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2881
  }
2882
}
10,846,225✔
2883

2884
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
10,840,639✔
2885
                        int8_t source) {
2886
  if (sql == NULL || NULL == fp) {
10,840,639!
2887
    terrno = TSDB_CODE_INVALID_PARA;
×
2888
    if (fp) {
×
2889
      fp(param, NULL, terrno);
×
2890
    }
2891

2892
    return;
1✔
2893
  }
2894

2895
  size_t sqlLen = strlen(sql);
10,845,388✔
2896
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
10,845,388✔
2897
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
1!
2898
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1✔
2899
    fp(param, NULL, terrno);
1✔
2900
    return;
1✔
2901
  }
2902

2903
  SRequestObj* pRequest = NULL;
10,845,387✔
2904
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
10,845,387✔
2905
  if (code != TSDB_CODE_SUCCESS) {
10,837,776!
2906
    terrno = code;
×
2907
    fp(param, NULL, terrno);
×
2908
    return;
×
2909
  }
2910

2911
  pRequest->source = source;
10,837,776✔
2912
  pRequest->body.queryFp = fp;
10,837,776✔
2913
  doAsyncQuery(pRequest, false);
10,837,776✔
2914
}
2915
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
×
2916
                                 int64_t reqid) {
2917
  if (sql == NULL || NULL == fp) {
×
2918
    terrno = TSDB_CODE_INVALID_PARA;
×
2919
    if (fp) {
×
2920
      fp(param, NULL, terrno);
×
2921
    }
2922

2923
    return;
×
2924
  }
2925

2926
  size_t sqlLen = strlen(sql);
×
2927
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
×
2928
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
×
2929
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
2930
    fp(param, NULL, terrno);
×
2931
    return;
×
2932
  }
2933

2934
  SRequestObj* pRequest = NULL;
×
2935
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
×
2936
  if (code != TSDB_CODE_SUCCESS) {
×
2937
    terrno = code;
×
2938
    fp(param, NULL, terrno);
×
2939
    return;
×
2940
  }
2941

2942
  pRequest->body.queryFp = fp;
×
2943
  doAsyncQuery(pRequest, false);
×
2944
}
2945

2946
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
10,841,634✔
2947
  if (NULL == taos) {
10,841,634!
2948
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2949
    return NULL;
×
2950
  }
2951

2952
  tscDebug("taos_query start with sql:%s", sql);
10,841,634✔
2953

2954
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
10,841,634✔
2955
  if (NULL == param) {
10,843,354!
2956
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2957
    return NULL;
×
2958
  }
2959
  int32_t code = tsem_init(&param->sem, 0, 0);
10,843,354✔
2960
  if (TSDB_CODE_SUCCESS != code) {
10,843,000!
2961
    taosMemoryFree(param);
×
2962
    return NULL;
×
2963
  }
2964

2965
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
10,843,000✔
2966
  code = tsem_wait(&param->sem);
10,790,112✔
2967
  if (TSDB_CODE_SUCCESS != code) {
10,843,209!
2968
    taosMemoryFree(param);
×
2969
    return NULL;
×
2970
  }
2971
  code = tsem_destroy(&param->sem);
10,843,209✔
2972
  if (TSDB_CODE_SUCCESS != code) {
10,841,846!
2973
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
2974
  }
2975

2976
  SRequestObj* pRequest = NULL;
10,841,929✔
2977
  if (param->pRequest != NULL) {
10,841,929✔
2978
    param->pRequest->syncQuery = true;
10,841,928✔
2979
    pRequest = param->pRequest;
10,841,928✔
2980
    param->pRequest->inCallback = false;
10,841,928✔
2981
  }
2982
  taosMemoryFree(param);
10,841,929✔
2983

2984
  tscDebug("taos_query end with sql:%s", sql);
10,845,299✔
2985

2986
  return pRequest;
10,846,030✔
2987
}
2988

2989
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
×
2990
  if (NULL == taos) {
×
2991
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2992
    return NULL;
×
2993
  }
2994

2995
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
×
2996
  if (param == NULL) {
×
2997
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2998
    return NULL;
×
2999
  }
3000
  int32_t code = tsem_init(&param->sem, 0, 0);
×
3001
  if (TSDB_CODE_SUCCESS != code) {
×
3002
    taosMemoryFree(param);
×
3003
    return NULL;
×
3004
  }
3005

3006
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
×
3007
  code = tsem_wait(&param->sem);
×
3008
  if (TSDB_CODE_SUCCESS != code) {
×
3009
    taosMemoryFree(param);
×
3010
    return NULL;
×
3011
  }
3012
  SRequestObj* pRequest = NULL;
×
3013
  if (param->pRequest != NULL) {
×
3014
    param->pRequest->syncQuery = true;
×
3015
    pRequest = param->pRequest;
×
3016
  }
3017
  taosMemoryFree(param);
×
3018
  return pRequest;
×
3019
}
3020

3021
static void fetchCallback(void* pResult, void* param, int32_t code) {
963,713✔
3022
  SRequestObj* pRequest = (SRequestObj*)param;
963,713✔
3023

3024
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
963,713✔
3025

3026
  tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
963,713✔
3027
           pRequest->requestId);
3028

3029
  pResultInfo->pData = pResult;
963,705✔
3030
  pResultInfo->numOfRows = 0;
963,705✔
3031

3032
  if (code != TSDB_CODE_SUCCESS) {
963,705!
3033
    pRequest->code = code;
×
3034
    taosMemoryFreeClear(pResultInfo->pData);
×
3035
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3036
    return;
×
3037
  }
3038

3039
  if (pRequest->code != TSDB_CODE_SUCCESS) {
963,705!
3040
    taosMemoryFreeClear(pResultInfo->pData);
×
3041
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3042
    return;
×
3043
  }
3044

3045
  pRequest->code =
963,723✔
3046
      setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4);
963,705✔
3047
  if (pRequest->code != TSDB_CODE_SUCCESS) {
963,723!
3048
    pResultInfo->numOfRows = 0;
×
3049
    tscError("0x%" PRIx64 " fetch results failed, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(pRequest->code),
×
3050
             pRequest->requestId);
3051
  } else {
3052
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
963,723✔
3053
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
3054
             pRequest->requestId);
3055

3056
    STscObj*            pTscObj = pRequest->pTscObj;
963,723✔
3057
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
963,723✔
3058
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
963,723✔
3059
  }
3060

3061
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
963,730✔
3062
}
3063

3064
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
1,038,758✔
3065
  pRequest->body.fetchFp = fp;
1,038,758✔
3066
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
1,038,758✔
3067

3068
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,038,758✔
3069

3070
  // this query has no results or error exists, return directly
3071
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,038,758!
3072
    pResultInfo->numOfRows = 0;
1✔
3073
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
1✔
3074
    return;
75,027✔
3075
  }
3076

3077
  // all data has returned to App already, no need to try again
3078
  if (pResultInfo->completed) {
1,038,756✔
3079
    // it is a local executed query, no need to do async fetch
3080
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
75,027✔
3081
      if (pResultInfo->localResultFetched) {
1,694✔
3082
        pResultInfo->numOfRows = 0;
847✔
3083
        pResultInfo->current = 0;
847✔
3084
      } else {
3085
        pResultInfo->localResultFetched = true;
847✔
3086
      }
3087
    } else {
3088
      pResultInfo->numOfRows = 0;
73,333✔
3089
    }
3090

3091
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
75,027✔
3092
    return;
75,027✔
3093
  }
3094

3095
  SSchedulerReq req = {
963,729✔
3096
      .syncReq = false,
3097
      .fetchFp = fetchCallback,
3098
      .cbParam = pRequest,
3099
  };
3100

3101
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
963,729✔
3102
  if (TSDB_CODE_SUCCESS != code) {
963,729!
3103
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3104
    // pRequest->body.fetchFp(param, pRequest, code);
3105
  }
3106
}
3107

3108
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
10,838,418✔
3109
  pRequest->inCallback = true;
10,838,418✔
3110
  int64_t this = pRequest->self;
10,838,418✔
3111
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
10,838,418!
3112
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
×
3113
    code = TSDB_CODE_SUCCESS;
×
3114
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3115
  }
3116
  pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
10,838,418✔
3117
  SRequestObj* pReq = acquireRequest(this);
10,846,603✔
3118
  if (pReq != NULL) {
10,844,670✔
3119
    pReq->inCallback = false;
10,843,144✔
3120
    (void)releaseRequest(this);
10,843,144✔
3121
  }
3122
}
10,838,281✔
3123

3124
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
667✔
3125
                       SParseSqlRes* pRes) {
3126
#ifndef TD_ENTERPRISE
3127
  return TSDB_CODE_SUCCESS;
3128
#else
3129
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
667✔
3130
#endif
3131
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc