• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3839

03 Apr 2025 01:31PM UTC coverage: 62.382% (+0.4%) from 61.998%
#3839

push

travis-ci

web-flow
Merge pull request #30644 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

154072 of 315065 branches covered (48.9%)

Branch coverage included in aggregate %.

397 of 459 new or added lines in 2 files covered. (86.49%)

395 existing lines in 53 files now uncovered.

238969 of 314991 relevant lines covered (75.87%)

20218564.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.66
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "scheduler.h"
22
#include "tdatablock.h"
23
#include "tdataformat.h"
24
#include "tdef.h"
25
#include "tglobal.h"
26
#include "tmsgtype.h"
27
#include "tpagedbuf.h"
28
#include "tref.h"
29
#include "tsched.h"
30
#include "tversion.h"
31
#include "decimal.h"
32

33
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
34
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
35

36
void setQueryRequest(int64_t rId) {
1,617,279✔
37
  SRequestObj* pReq = acquireRequest(rId);
1,617,279✔
38
  if (pReq != NULL) {
1,617,286✔
39
    pReq->isQuery = true;
1,617,285✔
40
    (void)releaseRequest(rId);
1,617,285✔
41
  }
42
}
1,617,274✔
43

44
static bool stringLengthCheck(const char* str, size_t maxsize) {
64,595✔
45
  if (str == NULL) {
64,595!
46
    return false;
×
47
  }
48

49
  size_t len = strlen(str);
64,595✔
50
  if (len <= 0 || len > maxsize) {
64,595!
51
    return false;
×
52
  }
53

54
  return true;
64,599✔
55
}
56

57
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
31,749✔
58

59
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
31,745✔
60

61
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,102✔
62

63
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
31,749✔
64
  char key[512] = {0};
31,749✔
65
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
31,749✔
66
  return taosStrdup(key);
31,749!
67
}
68

69
bool chkRequestKilled(void* param) {
35,967,396✔
70
  bool         killed = false;
35,967,396✔
71
  SRequestObj* pRequest = acquireRequest((int64_t)param);
35,967,396✔
72
  if (NULL == pRequest || pRequest->killed) {
36,548,972!
73
    killed = true;
×
74
  }
75

76
  (void)releaseRequest((int64_t)param);
36,548,972✔
77

78
  return killed;
36,471,263✔
79
}
80

81
void cleanupAppInfo() {
16,664✔
82
  taosHashCleanup(appInfo.pInstMap);
16,664✔
83
  taosHashCleanup(appInfo.pInstMapByClusterId);
16,664✔
84
}
16,664✔
85

86
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
87
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
88

89
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
31,749✔
90
                              uint16_t port, int connType, STscObj** pObj) {
91
  TSC_ERR_RET(taos_init());
31,749!
92
  if (!validateUserName(user)) {
31,751!
93
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
94
  }
95

96
  char localDb[TSDB_DB_NAME_LEN] = {0};
31,751✔
97
  if (db != NULL && strlen(db) > 0) {
31,751✔
98
    if (!validateDbName(db)) {
1,102!
99
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
100
    }
101

102
    tstrncpy(localDb, db, sizeof(localDb));
1,102✔
103
    (void)strdequote(localDb);
1,102✔
104
  }
105

106
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
31,749✔
107
  if (auth == NULL) {
31,749✔
108
    if (!validatePassword(pass)) {
31,745!
109
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
110
    }
111

112
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
31,748✔
113
  } else {
114
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
4✔
115
  }
116

117
  SCorEpSet epSet = {0};
31,749✔
118
  if (ip) {
31,749✔
119
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
11,892✔
120
  } else {
121
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
19,857!
122
  }
123

124
  if (port) {
31,749✔
125
    epSet.epSet.eps[0].port = port;
607✔
126
    epSet.epSet.eps[1].port = port;
607✔
127
  }
128

129
  char* key = getClusterKey(user, secretEncrypt, ip, port);
31,749✔
130
  if (NULL == key) {
31,748!
131
    TSC_ERR_RET(terrno);
×
132
  }
133
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
31,748!
134
          user, db, key);
135
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
83,359✔
136
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
51,610!
137
  }
138

139
  SAppInstInfo** pInst = NULL;
31,749✔
140
  int32_t        code = taosThreadMutexLock(&appInfo.mutex);
31,749✔
141
  if (TSDB_CODE_SUCCESS != code) {
31,749!
142
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
143
    TSC_ERR_RET(code);
×
144
  }
145

146
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
31,749✔
147
  SAppInstInfo* p = NULL;
31,749✔
148
  if (pInst == NULL) {
31,749✔
149
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
16,925!
150
    if (NULL == p) {
16,925!
151
      TSC_ERR_JRET(terrno);
×
152
    }
153
    p->mgmtEp = epSet;
16,925✔
154
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
16,925✔
155
    if (TSDB_CODE_SUCCESS != code) {
16,925!
156
      taosMemoryFree(p);
×
157
      TSC_ERR_JRET(code);
×
158
    }
159
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
16,925✔
160
    if (TSDB_CODE_SUCCESS != code) {
16,925!
161
      taosMemoryFree(p);
×
162
      TSC_ERR_JRET(code);
×
163
    }
164
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
16,925✔
165
    if (TSDB_CODE_SUCCESS != code) {
16,925!
166
      destroyAppInst(&p);
×
167
      TSC_ERR_JRET(code);
×
168
    }
169
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
16,925✔
170
    if (TSDB_CODE_SUCCESS != code) {
16,925!
171
      destroyAppInst(&p);
×
172
      TSC_ERR_JRET(code);
×
173
    }
174
    p->instKey = key;
16,925✔
175
    key = NULL;
16,925✔
176
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
16,925!
177

178
    pInst = &p;
16,925✔
179
  } else {
180
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
14,824!
181
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
182
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
183
    }
184
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
185
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
14,824✔
186
  }
187

188
_return:
31,749✔
189

190
  if (TSDB_CODE_SUCCESS != code) {
31,749!
191
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
192
    taosMemoryFreeClear(key);
×
193
    return code;
×
194
  } else {
195
    code = taosThreadMutexUnlock(&appInfo.mutex);
31,749✔
196
    taosMemoryFreeClear(key);
31,749!
197
    if (TSDB_CODE_SUCCESS != code) {
31,749!
198
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
199
      return code;
×
200
    }
201
    return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
31,749✔
202
  }
203
}
204

205
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
206
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
207
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
208
//     return *ppAppInstInfo;
209
//   } else {
210
//     return NULL;
211
//   }
212
// }
213

214
void freeQueryParam(SSyncQueryParam* param) {
936✔
215
  if (param == NULL) return;
936!
216
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
936!
217
    tscError("failed to destroy semaphore in freeQueryParam");
×
218
  }
219
  taosMemoryFree(param);
936!
220
}
221

222
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
11,132,948✔
223
                     SRequestObj** pRequest, int64_t reqid) {
224
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
11,132,948✔
225
  if (TSDB_CODE_SUCCESS != code) {
11,145,837!
226
    tscError("failed to malloc sqlObj, %s", sql);
×
227
    return code;
×
228
  }
229

230
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
11,145,837!
231
  if ((*pRequest)->sqlstr == NULL) {
11,133,011!
232
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
233
    destroyRequest(*pRequest);
×
234
    *pRequest = NULL;
×
235
    return terrno;
×
236
  }
237

238
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
11,133,011✔
239
  (*pRequest)->sqlstr[sqlLen] = 0;
11,139,192✔
240
  (*pRequest)->sqlLen = sqlLen;
11,139,192✔
241
  (*pRequest)->validateOnly = validateSql;
11,139,192✔
242
  (*pRequest)->isStmtBind = false;
11,139,192✔
243

244
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
11,139,192✔
245

246
  STscObj* pTscObj = (*pRequest)->pTscObj;
11,139,192✔
247
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
11,139,192✔
248
                             sizeof((*pRequest)->self));
249
  if (err) {
11,126,520!
250
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
251
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
252
    destroyRequest(*pRequest);
×
253
    *pRequest = NULL;
×
254
    return terrno;
×
255
  }
256

257
  (*pRequest)->allocatorRefId = -1;
11,126,520✔
258
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
11,126,520✔
259
    if (TSDB_CODE_SUCCESS !=
1,449,538!
260
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
1,449,521✔
261
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self,
×
262
               (*pRequest)->requestId, pTscObj->id, sql);
263
      destroyRequest(*pRequest);
×
264
      *pRequest = NULL;
×
265
      return terrno;
×
266
    }
267
  }
268

269
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
11,143,249✔
270
  return TSDB_CODE_SUCCESS;
11,138,491✔
271
}
272

273
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
509✔
274
  int32_t code =
275
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
509✔
276
  if (TSDB_CODE_SUCCESS == code) {
509!
277
    pRequest->relation.prevRefId = (*pNewRequest)->self;
509✔
278
    (*pNewRequest)->relation.nextRefId = pRequest->self;
509✔
279
    (*pNewRequest)->relation.userRefId = pRequest->self;
509✔
280
    (*pNewRequest)->isSubReq = true;
509✔
281
  }
282
  return code;
509✔
283
}
284

285
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
20,675✔
286
  STscObj* pTscObj = pRequest->pTscObj;
20,675✔
287

288
  SParseContext cxt = {.requestId = pRequest->requestId,
20,675✔
289
                       .requestRid = pRequest->self,
20,675✔
290
                       .acctId = pTscObj->acctId,
20,675✔
291
                       .db = pRequest->pDb,
20,675✔
292
                       .topicQuery = topicQuery,
293
                       .pSql = pRequest->sqlstr,
20,675✔
294
                       .sqlLen = pRequest->sqlLen,
20,675✔
295
                       .pMsg = pRequest->msgBuf,
20,675✔
296
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
297
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
20,675✔
298
                       .pStmtCb = pStmtCb,
299
                       .pUser = pTscObj->user,
20,675✔
300
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
20,675✔
301
                       .enableSysInfo = pTscObj->sysInfo,
20,675✔
302
                       .svrVer = pTscObj->sVer,
20,675✔
303
                       .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
20,675✔
304
                       .isStmtBind = pRequest->isStmtBind,
20,675✔
305
                       .setQueryFp = setQueryRequest,
306
                       .timezone = pTscObj->optionInfo.timezone,
20,675✔
307
                       .charsetCxt = pTscObj->optionInfo.charsetCxt,};
20,675✔
308

309
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
20,675✔
310
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
20,702✔
311
  if (code != TSDB_CODE_SUCCESS) {
20,700!
312
    return code;
×
313
  }
314

315
  code = qParseSql(&cxt, pQuery);
20,700✔
316
  if (TSDB_CODE_SUCCESS == code) {
20,677✔
317
    if ((*pQuery)->haveResultSet) {
20,647!
318
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols, (*pQuery)->pResExtSchema, pRequest->isStmtBind);
×
319
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
320
    }
321
  }
322

323
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
20,683!
324
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
20,657✔
325
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
20,657✔
326
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
20,657✔
327
  }
328

329
  taosArrayDestroy(cxt.pTableMetaPos);
20,683✔
330
  taosArrayDestroy(cxt.pTableVgroupPos);
20,682✔
331

332
  return code;
20,691✔
333
}
334

335
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
336
  SRetrieveTableRsp* pRsp = NULL;
×
337
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
338
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode, pRequest->pTscObj->optionInfo.charsetCxt);
×
339
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
340
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
×
341
  }
342

343
  return code;
×
344
}
345

346
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
555✔
347
  // drop table if exists not_exists_table
348
  if (NULL == pQuery->pCmdMsg) {
555!
349
    return TSDB_CODE_SUCCESS;
×
350
  }
351

352
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
555✔
353
  pRequest->type = pMsgInfo->msgType;
555✔
354
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
555✔
355
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
555✔
356

357
  STscObj*      pTscObj = pRequest->pTscObj;
555✔
358
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
555✔
359

360
  // int64_t transporterId = 0;
361
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
554!
362
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
558!
363
  return TSDB_CODE_SUCCESS;
558✔
364
}
365

366
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
21,411,400✔
367

368
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
139,426✔
369
  SRetrieveTableRsp* pRsp = NULL;
139,426✔
370
  if (pRequest->validateOnly) {
139,426✔
371
    doRequestCallback(pRequest, 0);
27✔
372
    return;
27✔
373
  }
374

375
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
139,399✔
376
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
139,399✔
377
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
139,399✔
378
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
116,929✔
379
  }
380

381
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
139,399✔
382
  pRequest->code = code;
139,399✔
383

384
  if (pRequest->code != TSDB_CODE_SUCCESS) {
139,399✔
385
    pResultInfo->numOfRows = 0;
3✔
386
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
3!
387
             pRequest->requestId);
388
  } else {
389
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
139,396✔
390
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
391
             pRequest->requestId);
392
  }
393

394
  doRequestCallback(pRequest, code);
139,399✔
395
}
396

397
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
34,510✔
398
  if (pRequest->validateOnly) {
34,510!
399
    doRequestCallback(pRequest, 0);
×
400
    return TSDB_CODE_SUCCESS;
×
401
  }
402

403
  // drop table if exists not_exists_table
404
  if (NULL == pQuery->pCmdMsg) {
34,510✔
405
    doRequestCallback(pRequest, 0);
1✔
406
    return TSDB_CODE_SUCCESS;
1✔
407
  }
408

409
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
34,509✔
410
  pRequest->type = pMsgInfo->msgType;
34,509✔
411
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
34,509✔
412
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
34,509✔
413

414
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
34,509✔
415
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
34,509✔
416

417
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
34,507✔
418
  if (code) {
34,511!
419
    doRequestCallback(pRequest, code);
×
420
  }
421
  return code;
34,511✔
422
}
423

424
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
503,627✔
425
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
503,627✔
426
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
503,627✔
427

428
  if (node1->load < node2->load) {
503,627!
429
    return -1;
×
430
  }
431

432
  return node1->load > node2->load;
503,627✔
433
}
434

435
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
161,656✔
436
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
161,656!
437
  if (pInfo->pQnodeList) {
161,656✔
438
    taosArrayDestroy(pInfo->pQnodeList);
161,030✔
439
    pInfo->pQnodeList = NULL;
161,030✔
440
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
161,030✔
441
  }
442

443
  if (pNodeList) {
161,656!
444
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
161,656✔
445
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
161,656✔
446
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
161,656✔
447
             taosArrayGetSize(pInfo->pQnodeList));
448
  }
449
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
161,656!
450

451
  return TSDB_CODE_SUCCESS;
161,656✔
452
}
453

454
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
11,109,784✔
455
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
11,109,784✔
456
    *required = false;
10,483,388✔
457
    return TSDB_CODE_SUCCESS;
10,483,388✔
458
  }
459

460
  int32_t       code = TSDB_CODE_SUCCESS;
626,396✔
461
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
626,396✔
462
  *required = false;
626,396✔
463

464
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
626,396!
465
  *required = (NULL == pInfo->pQnodeList);
626,396✔
466
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
626,396!
467
  return TSDB_CODE_SUCCESS;
626,396✔
468
}
469

470
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
471
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
472
  int32_t       code = 0;
×
473

474
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
475
  if (pInfo->pQnodeList) {
×
476
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
477
  }
478
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
479
  if (NULL == *pNodeList) {
×
480
    SCatalog* pCatalog = NULL;
×
481
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
482
    if (TSDB_CODE_SUCCESS == code) {
×
483
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
484
      if (NULL == pNodeList) {
×
485
        TSC_ERR_RET(terrno);
×
486
      }
487
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
488
                               .requestId = pRequest->requestId,
×
489
                               .requestObjRefId = pRequest->self,
×
490
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
491
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
492
    }
493

494
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
495
      code = updateQnodeList(pInfo, *pNodeList);
×
496
    }
497
  }
498

499
  return code;
×
500
}
501

502
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
52,385✔
503
  pRequest->type = pQuery->msgType;
52,385✔
504
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
52,385✔
505

506
  SPlanContext cxt = {.queryId = pRequest->requestId,
104,853✔
507
                      .acctId = pRequest->pTscObj->acctId,
52,394✔
508
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
52,394✔
509
                      .pAstRoot = pQuery->pRoot,
52,459✔
510
                      .showRewrite = pQuery->showRewrite,
52,459✔
511
                      .pMsg = pRequest->msgBuf,
52,459✔
512
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
513
                      .pUser = pRequest->pTscObj->user,
52,459✔
514
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
52,459✔
515
                      .sysInfo = pRequest->pTscObj->sysInfo};
52,459✔
516

517
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
52,459✔
518
}
519

520
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols, const SExtSchema* pExtSchema, bool isStmt) {
1,416,366✔
521
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
1,416,366!
522
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
523
    return TSDB_CODE_INVALID_PARA;
×
524
  }
525

526
  pResInfo->numOfCols = numOfCols;
1,416,418✔
527
  if (pResInfo->fields != NULL) {
1,416,418✔
528
    taosMemoryFree(pResInfo->fields);
54!
529
  }
530
  if (pResInfo->userFields != NULL) {
1,416,418✔
531
    taosMemoryFree(pResInfo->userFields);
54!
532
  }
533
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
1,416,418!
534
  if (NULL == pResInfo->fields) return terrno;
1,416,384!
535
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,416,384!
536
  if (NULL == pResInfo->userFields) {
1,416,404!
537
    taosMemoryFree(pResInfo->fields);
×
538
    return terrno;
×
539
  }
540
  if (numOfCols != pResInfo->numOfCols) {
1,416,404!
541
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
542
    return TSDB_CODE_FAILED;
×
543
  }
544

545
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
5,543,741✔
546
    pResInfo->fields[i].type = pSchema[i].type;
4,127,286✔
547

548
    pResInfo->userFields[i].type = pSchema[i].type;
4,127,286✔
549
    // userFields must convert to type bytes, no matter isStmt or not
550
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
4,127,286✔
551
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
4,127,330✔
552
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
4,127,265!
553
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
8,802✔
554
    }
555

556
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
4,127,337✔
557
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
4,127,337✔
558
  }
559
  return TSDB_CODE_SUCCESS;
1,416,455✔
560
}
561

562
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
1,079,455✔
563
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
1,079,455!
564
      precision != TSDB_TIME_PRECISION_NANO) {
565
    return;
×
566
  }
567

568
  pResInfo->precision = precision;
1,079,455✔
569
}
570

571
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
879,935✔
572
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
879,935✔
573
  if (NULL == nodeList) {
880,065!
574
    return terrno;
×
575
  }
576
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
880,086✔
577

578
  int32_t dbNum = taosArrayGetSize(pDbVgList);
880,086✔
579
  for (int32_t i = 0; i < dbNum; ++i) {
1,735,851✔
580
    SArray* pVg = taosArrayGetP(pDbVgList, i);
855,777✔
581
    if (NULL == pVg) {
855,759!
582
      continue;
×
583
    }
584
    int32_t vgNum = taosArrayGetSize(pVg);
855,759✔
585
    if (vgNum <= 0) {
855,786✔
586
      continue;
626✔
587
    }
588

589
    for (int32_t j = 0; j < vgNum; ++j) {
4,153,192✔
590
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
3,298,031✔
591
      if (NULL == pInfo) {
3,297,867!
592
        taosArrayDestroy(nodeList);
×
593
        return TSDB_CODE_OUT_OF_RANGE;
×
594
      }
595
      SQueryNodeLoad load = {0};
3,297,867✔
596
      load.addr.nodeId = pInfo->vgId;
3,297,867✔
597
      load.addr.epSet = pInfo->epSet;
3,297,867✔
598

599
      if (NULL == taosArrayPush(nodeList, &load)) {
3,298,032!
600
        taosArrayDestroy(nodeList);
×
601
        return terrno;
×
602
      }
603
    }
604
  }
605

606
  int32_t vnodeNum = taosArrayGetSize(nodeList);
880,074✔
607
  if (vnodeNum > 0) {
880,123✔
608
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
854,407✔
609
    goto _return;
854,411✔
610
  }
611

612
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
25,716✔
613
  if (mnodeNum <= 0) {
25,694!
614
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
615
    goto _return;
×
616
  }
617

618
  void* pData = taosArrayGet(pMnodeList, 0);
25,694✔
619
  if (NULL == pData) {
25,696!
620
    taosArrayDestroy(nodeList);
×
621
    return TSDB_CODE_OUT_OF_RANGE;
×
622
  }
623
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
25,696!
624
    taosArrayDestroy(nodeList);
×
625
    return terrno;
×
626
  }
627

628
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
25,701✔
629

630
_return:
21,727✔
631

632
  *pNodeList = nodeList;
880,078✔
633

634
  return TSDB_CODE_SUCCESS;
880,078✔
635
}
636

637
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
274,697✔
638
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
274,697✔
639
  if (NULL == nodeList) {
274,697!
640
    return terrno;
×
641
  }
642

643
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
274,697✔
644
  if (qNodeNum > 0) {
274,697✔
645
    void* pData = taosArrayGet(pQnodeList, 0);
274,375✔
646
    if (NULL == pData) {
274,375!
647
      taosArrayDestroy(nodeList);
×
648
      return TSDB_CODE_OUT_OF_RANGE;
×
649
    }
650
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
274,375!
651
      taosArrayDestroy(nodeList);
×
652
      return terrno;
×
653
    }
654
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
274,375✔
655
    goto _return;
274,375✔
656
  }
657

658
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
322✔
659
  if (mnodeNum <= 0) {
322✔
660
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
4!
661
    goto _return;
4✔
662
  }
663

664
  void* pData = taosArrayGet(pMnodeList, 0);
318✔
665
  if (NULL == pData) {
318!
666
    taosArrayDestroy(nodeList);
×
667
    return TSDB_CODE_OUT_OF_RANGE;
×
668
  }
669
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
318!
670
    taosArrayDestroy(nodeList);
×
671
    return terrno;
×
672
  }
673

674
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
318!
675

676
_return:
×
677

678
  *pNodeList = nodeList;
274,697✔
679

680
  return TSDB_CODE_SUCCESS;
274,697✔
681
}
682

683
void freeVgList(void* list) {
32,049✔
684
  SArray* pList = *(SArray**)list;
32,049✔
685
  taosArrayDestroy(pList);
32,049✔
686
}
32,096✔
687

688
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
1,102,268✔
689
  SArray* pDbVgList = NULL;
1,102,268✔
690
  SArray* pQnodeList = NULL;
1,102,268✔
691
  FDelete fp = NULL;
1,102,268✔
692
  int32_t code = 0;
1,102,268✔
693

694
  switch (tsQueryPolicy) {
1,102,268!
695
    case QUERY_POLICY_VNODE:
827,596✔
696
    case QUERY_POLICY_CLIENT: {
697
      if (pResultMeta) {
827,596!
698
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
827,638✔
699
        if (NULL == pDbVgList) {
827,623!
700
          code = terrno;
×
701
          goto _return;
×
702
        }
703
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
827,623✔
704
        for (int32_t i = 0; i < dbNum; ++i) {
1,651,303✔
705
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
823,689✔
706
          if (pRes->code || NULL == pRes->pRes) {
823,676!
707
            continue;
×
708
          }
709

710
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
1,647,423!
711
            code = terrno;
×
712
            goto _return;
×
713
          }
714
        }
715
      } else {
716
        fp = freeVgList;
×
717

718
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
719
        if (dbNum > 0) {
×
720
          SCatalog*     pCtg = NULL;
×
721
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
722
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
723
          if (code != TSDB_CODE_SUCCESS) {
×
724
            goto _return;
×
725
          }
726

727
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
728
          if (NULL == pDbVgList) {
×
729
            code = terrno;
×
730
            goto _return;
×
731
          }
732
          SArray* pVgList = NULL;
×
733
          for (int32_t i = 0; i < dbNum; ++i) {
×
734
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
735
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
736
                                     .requestId = pRequest->requestId,
×
737
                                     .requestObjRefId = pRequest->self,
×
738
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
739

740
            // catalogGetDBVgList will handle dbFName == null.
741
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
742
            if (code) {
×
743
              goto _return;
×
744
            }
745

746
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
747
              code = terrno;
×
748
              goto _return;
×
749
            }
750
          }
751
        }
752
      }
753

754
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
827,614✔
755
      break;
827,668✔
756
    }
757
    case QUERY_POLICY_HYBRID:
274,695✔
758
    case QUERY_POLICY_QNODE: {
759
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
280,105!
760
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
5,410✔
761
        if (pRes->code) {
5,410!
762
          pQnodeList = NULL;
×
763
        } else {
764
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
5,410✔
765
          if (NULL == pQnodeList) {
5,410!
766
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
767
            goto _return;
×
768
          }
769
        }
770
      } else {
771
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
269,285✔
772
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
269,285!
773
        if (pInst->pQnodeList) {
269,287!
774
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
269,287✔
775
          if (NULL == pQnodeList) {
269,287!
776
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
777
            goto _return;
×
778
          }
779
        }
780
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
269,287!
781
      }
782

783
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
274,697✔
784
      break;
274,697✔
785
    }
786
    default:
×
787
      tscError("unknown query policy: %d", tsQueryPolicy);
×
788
      return TSDB_CODE_APP_ERROR;
×
789
  }
790

791
_return:
1,102,365✔
792
  taosArrayDestroyEx(pDbVgList, fp);
1,102,365✔
793
  taosArrayDestroy(pQnodeList);
1,102,385✔
794

795
  return code;
1,102,391✔
796
}
797

798
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
52,362✔
799
  SArray* pDbVgList = NULL;
52,362✔
800
  SArray* pQnodeList = NULL;
52,362✔
801
  int32_t code = 0;
52,362✔
802

803
  switch (tsQueryPolicy) {
52,362!
804
    case QUERY_POLICY_VNODE:
52,373✔
805
    case QUERY_POLICY_CLIENT: {
806
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
52,373✔
807
      if (dbNum > 0) {
52,424✔
808
        SCatalog*     pCtg = NULL;
32,090✔
809
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
32,090✔
810
        code = catalogGetHandle(pInst->clusterId, &pCtg);
32,090✔
811
        if (code != TSDB_CODE_SUCCESS) {
32,071!
812
          goto _return;
×
813
        }
814

815
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
32,071✔
816
        if (NULL == pDbVgList) {
32,100✔
817
          code = terrno;
18✔
818
          goto _return;
×
819
        }
820
        SArray* pVgList = NULL;
32,082✔
821
        for (int32_t i = 0; i < dbNum; ++i) {
64,171✔
822
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
32,037✔
823
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
32,052✔
824
                                   .requestId = pRequest->requestId,
32,052✔
825
                                   .requestObjRefId = pRequest->self,
32,052✔
826
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
32,052✔
827

828
          // catalogGetDBVgList will handle dbFName == null.
829
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
32,108✔
830
          if (code) {
32,092!
831
            goto _return;
×
832
          }
833

834
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
32,089!
835
            code = terrno;
×
836
            goto _return;
×
837
          }
838
        }
839
      }
840

841
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
52,468✔
842
      break;
52,416✔
843
    }
844
    case QUERY_POLICY_HYBRID:
×
845
    case QUERY_POLICY_QNODE: {
846
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
847

848
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
849
      break;
×
850
    }
851
    default:
×
852
      tscError("unknown query policy: %d", tsQueryPolicy);
×
853
      return TSDB_CODE_APP_ERROR;
×
854
  }
855

856
_return:
52,416✔
857

858
  taosArrayDestroyEx(pDbVgList, freeVgList);
52,416✔
859
  taosArrayDestroy(pQnodeList);
52,409✔
860

861
  return code;
52,425✔
862
}
863

864
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
52,390✔
865
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
52,390✔
866

867
  SExecResult      res = {0};
52,390✔
868
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
52,390✔
869
                           .requestId = pRequest->requestId,
52,390✔
870
                           .requestObjRefId = pRequest->self};
52,390✔
871
  SSchedulerReq    req = {
104,811✔
872
         .syncReq = true,
873
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
52,390✔
874
         .pConn = &conn,
875
         .pNodeList = pNodeList,
876
         .pDag = pDag,
877
         .sql = pRequest->sqlstr,
52,390✔
878
         .startTs = pRequest->metric.start,
52,390✔
879
         .execFp = NULL,
880
         .cbParam = NULL,
881
         .chkKillFp = chkRequestKilled,
882
         .chkKillParam = (void*)pRequest->self,
52,390✔
883
         .pExecRes = &res,
884
         .source = pRequest->source,
52,390✔
885
         .pWorkerCb = getTaskPoolWorkerCb(),
52,390✔
886
  };
887

888
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
52,421✔
889

890
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
52,427✔
891
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
52,421✔
892

893
  if (code != TSDB_CODE_SUCCESS) {
52,421!
894
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
895

896
    pRequest->code = code;
×
897
    terrno = code;
×
898
    return pRequest->code;
9✔
899
  }
900

901
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
52,421!
902
      TDMT_VND_CREATE_TABLE == pRequest->type) {
169✔
903
    pRequest->body.resInfo.numOfRows = res.numOfRows;
52,388✔
904
    if (TDMT_VND_SUBMIT == pRequest->type) {
52,388✔
905
      STscObj*            pTscObj = pRequest->pTscObj;
52,274✔
906
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
52,274✔
907
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
52,274✔
908
    }
909

910
    schedulerFreeJob(&pRequest->body.queryJob, 0);
52,407✔
911
  }
912

913
  pRequest->code = res.code;
52,451✔
914
  terrno = res.code;
52,451✔
915
  return pRequest->code;
52,429✔
916
}
917

918
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
9,699,675✔
919
  SArray*      pArray = NULL;
9,699,675✔
920
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
9,699,675✔
921
  if (NULL == pRsp->aCreateTbRsp) {
9,699,675✔
922
    return TSDB_CODE_SUCCESS;
9,635,517✔
923
  }
924

925
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
64,158✔
926
  for (int32_t i = 0; i < tbNum; ++i) {
137,318✔
927
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
71,514✔
928
    if (pTbRsp->pMeta) {
71,507✔
929
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
50,260!
930
    }
931
  }
932

933
  return TSDB_CODE_SUCCESS;
65,804✔
934
}
935

936
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
835,991✔
937
  int32_t code = 0;
835,991✔
938
  SArray* pArray = NULL;
835,991✔
939
  SArray* pTbArray = (SArray*)res;
835,991✔
940
  int32_t tbNum = taosArrayGetSize(pTbArray);
835,991✔
941
  if (tbNum <= 0) {
835,994!
942
    return TSDB_CODE_SUCCESS;
×
943
  }
944

945
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
835,994✔
946
  if (NULL == pArray) {
836,018✔
947
    return terrno;
3✔
948
  }
949

950
  for (int32_t i = 0; i < tbNum; ++i) {
2,200,787✔
951
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
1,364,763✔
952
    if (NULL == tbInfo) {
1,364,763!
953
      code = terrno;
×
954
      goto _return;
×
955
    }
956
    STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
1,364,763✔
957
    if (NULL == taosArrayPush(pArray, &tbSver)) {
1,364,772!
958
      code = terrno;
×
959
      goto _return;
×
960
    }
961
  }
962

963
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
836,024✔
964
                           .requestId = pRequest->requestId,
836,024✔
965
                           .requestObjRefId = pRequest->self,
836,024✔
966
                           .mgmtEps = *epset};
967

968
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
836,024✔
969

970
_return:
835,995✔
971

972
  taosArrayDestroy(pArray);
835,995✔
973
  return code;
836,015✔
974
}
975

976
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
3,944✔
977
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
3,944✔
978
}
979

980
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
155,610✔
981
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
155,610✔
982
}
983

984
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
10,858,643✔
985
  if (NULL == pRequest->body.resInfo.execRes.res) {
10,858,643✔
986
    return pRequest->code;
280,291✔
987
  }
988

989
  SCatalog*     pCatalog = NULL;
10,578,352✔
990
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
10,578,352✔
991

992
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
10,579,671✔
993
  if (code) {
10,573,157!
994
    return code;
×
995
  }
996

997
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
10,573,157✔
998
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
10,610,053✔
999

1000
  switch (pRes->msgType) {
10,610,053✔
1001
    case TDMT_VND_ALTER_TABLE:
740✔
1002
    case TDMT_MND_ALTER_STB: {
1003
      code = handleAlterTbExecRes(pRes->res, pCatalog);
740✔
1004
      break;
740✔
1005
    }
1006
    case TDMT_VND_CREATE_TABLE: {
63,589✔
1007
      SArray* pList = (SArray*)pRes->res;
63,589✔
1008
      int32_t num = taosArrayGetSize(pList);
63,589✔
1009
      for (int32_t i = 0; i < num; ++i) {
162,975✔
1010
        void* res = taosArrayGetP(pList, i);
99,386✔
1011
        // handleCreateTbExecRes will handle res == null
1012
        code = handleCreateTbExecRes(res, pCatalog);
99,376✔
1013
      }
1014
      break;
63,589✔
1015
    }
1016
    case TDMT_MND_CREATE_STB: {
339✔
1017
      code = handleCreateTbExecRes(pRes->res, pCatalog);
339✔
1018
      break;
339✔
1019
    }
1020
    case TDMT_VND_SUBMIT: {
9,702,262✔
1021
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
9,702,262✔
1022

1023
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
9,706,455✔
1024
      break;
9,701,627✔
1025
    }
1026
    case TDMT_SCH_QUERY:
836,011✔
1027
    case TDMT_SCH_MERGE_QUERY: {
1028
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
836,011✔
1029
      break;
835,996✔
1030
    }
1031
    default:
7,112✔
1032
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self, pRequest->type,
7,112!
1033
               pRequest->requestId);
1034
      code = TSDB_CODE_APP_ERROR;
×
1035
  }
1036

1037
  return code;
10,602,291✔
1038
}
1039

1040
static bool incompletaFileParsing(SNode* pStmt) {
10,829,170✔
1041
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
10,829,170✔
1042
}
1043

1044
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
485✔
1045
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
485✔
1046

1047
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
485✔
1048
  if (TSDB_CODE_SUCCESS == code) {
485!
1049
    int64_t analyseStart = taosGetTimestampUs();
485✔
1050
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
485✔
1051
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
485✔
1052
  }
1053

1054
  if (TSDB_CODE_SUCCESS == code) {
485!
1055
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
485✔
1056
  }
1057

1058
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
485✔
1059
  handleQueryAnslyseRes(pWrapper, NULL, code);
485✔
1060
}
485✔
1061

1062
void returnToUser(SRequestObj* pRequest) {
59,246✔
1063
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
59,246!
1064
    // return to client
1065
    doRequestCallback(pRequest, pRequest->code);
59,245✔
1066
    return;
59,245✔
1067
  }
1068

1069
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
1✔
1070
  if (pUserReq) {
1!
1071
    pUserReq->code = pRequest->code;
1✔
1072
    // return to client
1073
    doRequestCallback(pUserReq, pUserReq->code);
1✔
1074
    (void)releaseRequest(pRequest->relation.userRefId);
1✔
1075
    return;
1✔
1076
  } else {
1077
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1078
             pRequest->relation.userRefId, pRequest->requestId);
1079
  }
1080
}
1081

1082
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
485✔
1083
  int64_t     lastTs = 0;
485✔
1084
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
485✔
1085
  int32_t     numOfFields = taos_num_fields(pRes);
485✔
1086

1087
  int32_t code = createDataBlock(pBlock);
485✔
1088
  if (code) {
485!
1089
    return code;
×
1090
  }
1091

1092
  for (int32_t i = 0; i < numOfFields; ++i) {
1,940✔
1093
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
1,455✔
1094
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
1,455✔
1095
    if (TSDB_CODE_SUCCESS != code) {
1,455!
1096
      blockDataDestroy(*pBlock);
×
1097
      return code;
×
1098
    }
1099
  }
1100

1101
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
485✔
1102
  if (TSDB_CODE_SUCCESS != code) {
485!
1103
    blockDataDestroy(*pBlock);
×
1104
    return code;
×
1105
  }
1106

1107
  for (int32_t i = 0; i < numOfRows; ++i) {
1,517✔
1108
    TAOS_ROW pRow = taos_fetch_row(pRes);
1,032✔
1109
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
1,032!
1110
      tscError("invalid data from vnode");
×
1111
      blockDataDestroy(*pBlock);
×
1112
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1113
    }
1114
    int64_t ts = *(int64_t*)pRow[0];
1,032✔
1115
    if (lastTs < ts) {
1,032✔
1116
      lastTs = ts;
578✔
1117
    }
1118

1119
    for (int32_t j = 0; j < numOfFields; ++j) {
4,128✔
1120
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
3,096✔
1121
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
3,096✔
1122
      if (TSDB_CODE_SUCCESS != code) {
3,096!
1123
        blockDataDestroy(*pBlock);
×
1124
        return code;
×
1125
      }
1126
    }
1127

1128
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
1,032!
1129
  }
1130

1131
  (*pBlock)->info.window.ekey = lastTs;
485✔
1132
  (*pBlock)->info.rows = numOfRows;
485✔
1133

1134
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
485!
1135
  return TSDB_CODE_SUCCESS;
485✔
1136
}
1137

1138
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
486✔
1139
  SRequestObj* pRequest = (SRequestObj*)res;
486✔
1140
  if (pRequest->code) {
486✔
1141
    returnToUser(pRequest);
1✔
1142
    return;
1✔
1143
  }
1144

1145
  SSDataBlock* pBlock = NULL;
485✔
1146
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
485✔
1147
  if (TSDB_CODE_SUCCESS != pRequest->code) {
485!
1148
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1149
             tstrerror(pRequest->code));
1150
    returnToUser(pRequest);
×
1151
    return;
×
1152
  }
1153

1154
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
485✔
1155
  if (pNextReq) {
485!
1156
    continuePostSubQuery(pNextReq, pBlock);
485✔
1157
    (void)releaseRequest(pRequest->relation.nextRefId);
485✔
1158
  } else {
1159
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1160
             pRequest->relation.nextRefId, pRequest->requestId);
1161
  }
1162

1163
  blockDataDestroy(pBlock);
485✔
1164
}
1165

1166
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
486✔
1167
  SRequestObj* pRequest = pWrapper->pRequest;
486✔
1168
  if (TD_RES_QUERY(pRequest)) {
486!
1169
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
486✔
1170
    return;
486✔
1171
  }
1172

1173
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1174
  if (pNextReq) {
×
1175
    continuePostSubQuery(pNextReq, NULL);
×
1176
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1177
  } else {
1178
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1179
             pRequest->relation.nextRefId, pRequest->requestId);
1180
  }
1181
}
1182

1183
// todo refacto the error code  mgmt
1184
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
10,769,032✔
1185
  SSqlCallbackWrapper* pWrapper = param;
10,769,032✔
1186
  SRequestObj*         pRequest = pWrapper->pRequest;
10,769,032✔
1187
  STscObj*             pTscObj = pRequest->pTscObj;
10,769,032✔
1188

1189
  pRequest->code = code;
10,769,032✔
1190
  if (pResult) {
10,769,032!
1191
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
10,772,196✔
1192
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
10,784,266✔
1193
  }
1194

1195
  int32_t type = pRequest->type;
10,781,102✔
1196
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
10,781,102✔
1197
    if (pResult) {
9,818,337!
1198
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
9,827,645✔
1199

1200
      // record the insert rows
1201
      if (TDMT_VND_SUBMIT == type) {
9,827,645✔
1202
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
9,616,409✔
1203
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
9,616,409✔
1204
      }
1205
    }
1206
    schedulerFreeJob(&pRequest->body.queryJob, 0);
9,858,750✔
1207
  }
1208

1209
  taosMemoryFree(pResult);
10,819,351!
1210
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
10,831,076✔
1211
           pRequest->requestId);
1212

1213
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
10,832,261!
1214
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64, pRequest->self,
319✔
1215
             tstrerror(code), pRequest->retry, pRequest->requestId);
1216
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
319!
1217
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1218
    }
1219
    restartAsyncQuery(pRequest, code);
319✔
1220
    return;
319✔
1221
  }
1222

1223
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
10,831,942!
1224
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
10,831,942!
1225
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
12,468!
1226
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1227
    }
1228
  }
1229

1230
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
10,813,395✔
1231
  int32_t code1 = handleQueryExecRsp(pRequest);
10,813,395✔
1232
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
10,828,735!
1233
    pRequest->code = code1;
×
1234
  }
1235

1236
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
21,657,627!
1237
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
10,829,340✔
1238
    continueInsertFromCsv(pWrapper, pRequest);
×
1239
    return;
2✔
1240
  }
1241

1242
  if (pRequest->relation.nextRefId) {
10,830,667✔
1243
    handlePostSubQuery(pWrapper);
486✔
1244
  } else {
1245
    destorySqlCallbackWrapper(pWrapper);
10,830,181✔
1246
    pRequest->pWrapper = NULL;
10,834,257✔
1247

1248
    // return to client
1249
    doRequestCallback(pRequest, code);
10,834,257✔
1250
  }
1251
}
1252

1253
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
52,957✔
1254
  int32_t code = 0;
52,957✔
1255
  int32_t subplanNum = 0;
52,957✔
1256

1257
  if (pQuery->pRoot) {
52,957✔
1258
    pRequest->stmtType = pQuery->pRoot->type;
52,422✔
1259
  }
1260

1261
  if (pQuery->pRoot && !pRequest->inRetry) {
52,957!
1262
    STscObj*            pTscObj = pRequest->pTscObj;
52,437✔
1263
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
52,437✔
1264
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
52,437✔
1265
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
52,434✔
1266
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
3!
1267
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
5✔
1268
    }
1269
  }
1270

1271
  pRequest->body.execMode = pQuery->execMode;
53,006✔
1272
  switch (pQuery->execMode) {
53,006!
1273
    case QUERY_EXEC_MODE_LOCAL:
×
1274
      if (!pRequest->validateOnly) {
×
1275
        if (NULL == pQuery->pRoot) {
×
1276
          terrno = TSDB_CODE_INVALID_PARA;
×
1277
          code = terrno;
×
1278
        } else {
1279
          code = execLocalCmd(pRequest, pQuery);
×
1280
        }
1281
      }
1282
      break;
×
1283
    case QUERY_EXEC_MODE_RPC:
554✔
1284
      if (!pRequest->validateOnly) {
554!
1285
        code = execDdlQuery(pRequest, pQuery);
554✔
1286
      }
1287
      break;
558✔
1288
    case QUERY_EXEC_MODE_SCHEDULE: {
52,452✔
1289
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
52,452✔
1290
      if (NULL == pMnodeList) {
52,410!
1291
        code = terrno;
×
1292
        break;
×
1293
      }
1294
      SQueryPlan* pDag = NULL;
52,410✔
1295
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
52,410✔
1296
      if (TSDB_CODE_SUCCESS == code) {
52,404!
1297
        pRequest->body.subplanNum = pDag->numOfSubplans;
52,421✔
1298
        if (!pRequest->validateOnly) {
52,421✔
1299
          SArray* pNodeList = NULL;
52,407✔
1300
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
52,407✔
1301
          if (TSDB_CODE_SUCCESS == code) {
52,427!
1302
            code = scheduleQuery(pRequest, pDag, pNodeList);
52,437✔
1303
          }
1304
          taosArrayDestroy(pNodeList);
52,398✔
1305
        }
1306
      }
1307
      taosArrayDestroy(pMnodeList);
52,458✔
1308
      break;
52,462✔
1309
    }
1310
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1311
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1312
      break;
×
1313
    default:
×
1314
      break;
×
1315
  }
1316

1317
  if (!keepQuery) {
53,020!
1318
    qDestroyQuery(pQuery);
×
1319
  }
1320

1321
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
53,020!
1322
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
222!
1323
    if (TSDB_CODE_SUCCESS != ret) {
222!
1324
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret, pRequest->requestId);
×
1325
    }
1326
  }
1327

1328
  if (TSDB_CODE_SUCCESS == code) {
53,020✔
1329
    code = handleQueryExecRsp(pRequest);
52,992✔
1330
  }
1331

1332
  if (TSDB_CODE_SUCCESS != code) {
53,038✔
1333
    pRequest->code = code;
138✔
1334
  }
1335

1336
  if (res) {
53,038!
1337
    *res = pRequest->body.resInfo.execRes.res;
×
1338
    pRequest->body.resInfo.execRes.res = NULL;
×
1339
  }
1340
}
53,038✔
1341

1342
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
10,802,596✔
1343
                                 SSqlCallbackWrapper* pWrapper) {
1344
  int32_t code = TSDB_CODE_SUCCESS;
10,802,596✔
1345
  pRequest->type = pQuery->msgType;
10,802,596✔
1346
  SArray*     pMnodeList = NULL;
10,802,596✔
1347
  SQueryPlan* pDag = NULL;
10,802,596✔
1348
  int64_t     st = taosGetTimestampUs();
10,803,058✔
1349

1350
  if (!pRequest->parseOnly) {
10,803,058!
1351
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
10,808,316✔
1352
    if (NULL == pMnodeList) {
10,806,619!
1353
      code = terrno;
×
1354
    }
1355
    SPlanContext cxt = {.queryId = pRequest->requestId,
21,642,056✔
1356
                        .acctId = pRequest->pTscObj->acctId,
10,806,619✔
1357
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
10,806,619✔
1358
                        .pAstRoot = pQuery->pRoot,
10,835,437✔
1359
                        .showRewrite = pQuery->showRewrite,
10,835,437✔
1360
                        .isView = pWrapper->pParseCtx->isView,
10,835,437✔
1361
                        .isAudit = pWrapper->pParseCtx->isAudit,
10,835,437✔
1362
                        .pMsg = pRequest->msgBuf,
10,835,437✔
1363
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1364
                        .pUser = pRequest->pTscObj->user,
10,835,437✔
1365
                        .sysInfo = pRequest->pTscObj->sysInfo,
10,835,437✔
1366
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
10,835,437✔
1367
                        .allocatorId = pRequest->allocatorRefId};
10,835,437✔
1368
    if (TSDB_CODE_SUCCESS == code) {
10,835,437!
1369
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
10,836,221✔
1370
    }
1371
    if (code) {
10,777,012✔
1372
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
673!
1373
               pRequest->requestId);
1374
    } else {
1375
      pRequest->body.subplanNum = pDag->numOfSubplans;
10,776,339✔
1376
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
10,776,339✔
1377
    }
1378
  }
1379

1380
  pRequest->metric.execStart = taosGetTimestampUs();
10,768,834✔
1381
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
10,768,834✔
1382

1383
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
21,568,345!
1384
    SArray* pNodeList = NULL;
10,793,354✔
1385
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
10,793,354✔
1386
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
1,102,337✔
1387
    }
1388

1389
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
10,793,403✔
1390
                             .requestId = pRequest->requestId,
10,765,486✔
1391
                             .requestObjRefId = pRequest->self};
10,765,486✔
1392
    SSchedulerReq    req = {
21,533,281✔
1393
           .syncReq = false,
1394
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
10,765,486✔
1395
           .pConn = &conn,
1396
           .pNodeList = pNodeList,
1397
           .pDag = pDag,
1398
           .allocatorRefId = pRequest->allocatorRefId,
10,765,486✔
1399
           .sql = pRequest->sqlstr,
10,765,486✔
1400
           .startTs = pRequest->metric.start,
10,765,486✔
1401
           .execFp = schedulerExecCb,
1402
           .cbParam = pWrapper,
1403
           .chkKillFp = chkRequestKilled,
1404
           .chkKillParam = (void*)pRequest->self,
10,765,486✔
1405
           .pExecRes = NULL,
1406
           .source = pRequest->source,
10,765,486✔
1407
           .pWorkerCb = getTaskPoolWorkerCb(),
10,765,486✔
1408
    };
1409
    if (TSDB_CODE_SUCCESS == code) {
10,767,795!
1410
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
10,774,500✔
1411
    }
1412

1413
    taosArrayDestroy(pNodeList);
10,800,475✔
1414
  } else {
UNCOV
1415
    qDestroyQueryPlan(pDag);
×
1416
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
895✔
1417
             pRequest->requestId);
1418
    destorySqlCallbackWrapper(pWrapper);
895✔
1419
    pRequest->pWrapper = NULL;
895✔
1420
    if (TSDB_CODE_SUCCESS != code) {
895✔
1421
      pRequest->code = terrno;
673✔
1422
    }
1423

1424
    doRequestCallback(pRequest, code);
895✔
1425
  }
1426

1427
  // todo not to be released here
1428
  taosArrayDestroy(pMnodeList);
10,800,406✔
1429

1430
  return code;
10,800,279✔
1431
}
1432

1433
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
10,929,284✔
1434
  int32_t code = 0;
10,929,284✔
1435

1436
  if (pRequest->parseOnly) {
10,929,284✔
1437
    doRequestCallback(pRequest, 0);
627✔
1438
    return;
627✔
1439
  }
1440

1441
  pRequest->body.execMode = pQuery->execMode;
10,928,657✔
1442
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
10,928,657✔
1443
    destorySqlCallbackWrapper(pWrapper);
177,238✔
1444
    pRequest->pWrapper = NULL;
177,239✔
1445
  }
1446

1447
  if (pQuery->pRoot && !pRequest->inRetry) {
10,928,658!
1448
    STscObj*            pTscObj = pRequest->pTscObj;
10,966,661✔
1449
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
10,966,661✔
1450
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
10,966,661✔
1451
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
9,700,113✔
1452
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
9,625,083✔
1453
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
1,341,578✔
1454
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
932,009✔
1455
    }
1456
  }
1457

1458
  switch (pQuery->execMode) {
10,990,869!
1459
    case QUERY_EXEC_MODE_LOCAL:
139,426✔
1460
      asyncExecLocalCmd(pRequest, pQuery);
139,426✔
1461
      break;
139,426✔
1462
    case QUERY_EXEC_MODE_RPC:
34,512✔
1463
      code = asyncExecDdlQuery(pRequest, pQuery);
34,512✔
1464
      break;
34,512✔
1465
    case QUERY_EXEC_MODE_SCHEDULE: {
10,813,630✔
1466
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
10,813,630✔
1467
      break;
10,792,734✔
1468
    }
1469
    case QUERY_EXEC_MODE_EMPTY_RESULT:
3,301✔
1470
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
3,301✔
1471
      doRequestCallback(pRequest, 0);
3,301✔
1472
      break;
3,301✔
1473
    default:
×
1474
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1475
      doRequestCallback(pRequest, -1);
×
1476
      break;
×
1477
  }
1478
}
1479

1480
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
15✔
1481
  SCatalog* pCatalog = NULL;
15✔
1482
  int32_t   code = 0;
15✔
1483
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
15✔
1484
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
15✔
1485

1486
  if (dbNum <= 0 && tblNum <= 0) {
15!
1487
    return TSDB_CODE_APP_ERROR;
15✔
1488
  }
1489

1490
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
1491
  if (code != TSDB_CODE_SUCCESS) {
×
1492
    return code;
×
1493
  }
1494

1495
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1496
                           .requestId = pRequest->requestId,
×
1497
                           .requestObjRefId = pRequest->self,
×
1498
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1499

1500
  for (int32_t i = 0; i < dbNum; ++i) {
×
1501
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1502

1503
    // catalogRefreshDBVgInfo will handle dbFName == null.
1504
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
1505
    if (code != TSDB_CODE_SUCCESS) {
×
1506
      return code;
×
1507
    }
1508
  }
1509

1510
  for (int32_t i = 0; i < tblNum; ++i) {
×
1511
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1512

1513
    // catalogRefreshTableMeta will handle tableName == null.
1514
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
1515
    if (code != TSDB_CODE_SUCCESS) {
×
1516
      return code;
×
1517
    }
1518
  }
1519

1520
  return code;
×
1521
}
1522

1523
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
14,829✔
1524
  SCatalog* pCatalog = NULL;
14,829✔
1525
  int32_t   tbNum = taosArrayGetSize(tbList);
14,829✔
1526
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
14,829✔
1527
  if (code != TSDB_CODE_SUCCESS) {
14,829!
1528
    return code;
×
1529
  }
1530

1531
  if (isView) {
14,829✔
1532
    for (int32_t i = 0; i < tbNum; ++i) {
832✔
1533
      SName* pViewName = taosArrayGet(tbList, i);
416✔
1534
      char   dbFName[TSDB_DB_FNAME_LEN];
1535
      if (NULL == pViewName) {
416!
1536
        continue;
×
1537
      }
1538
      (void)tNameGetFullDbName(pViewName, dbFName);
416✔
1539
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
416!
1540
    }
1541
  } else {
1542
    for (int32_t i = 0; i < tbNum; ++i) {
18,892✔
1543
      SName* pTbName = taosArrayGet(tbList, i);
4,479✔
1544
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
4,479!
1545
    }
1546
  }
1547

1548
  return TSDB_CODE_SUCCESS;
14,829✔
1549
}
1550

1551
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
31,749✔
1552
  pEpSet->version = 0;
31,749✔
1553

1554
  // init mnode ip set
1555
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
31,749✔
1556
  mgmtEpSet->numOfEps = 0;
31,749✔
1557
  mgmtEpSet->inUse = 0;
31,749✔
1558

1559
  if (firstEp && firstEp[0] != 0) {
31,749!
1560
    if (strlen(firstEp) >= TSDB_EP_LEN) {
31,750!
1561
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1562
      return -1;
×
1563
    }
1564

1565
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
31,750✔
1566
    if (code != TSDB_CODE_SUCCESS) {
31,751!
1567
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1568
      return terrno;
×
1569
    }
1570
    uint32_t addr = 0;
31,751✔
1571
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
31,751✔
1572
    if (code) {
31,753✔
1573
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
4!
1574
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1575
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
4✔
1576
    } else {
1577
      mgmtEpSet->numOfEps++;
31,749✔
1578
    }
1579
  }
1580

1581
  if (secondEp && secondEp[0] != 0) {
31,752!
1582
    if (strlen(secondEp) >= TSDB_EP_LEN) {
19,861!
1583
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1584
      return terrno;
×
1585
    }
1586

1587
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
19,861✔
1588
    if (code != TSDB_CODE_SUCCESS) {
19,861!
1589
      return code;
×
1590
    }
1591
    uint32_t addr = 0;
19,861✔
1592
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
19,861✔
1593
    if (code) {
19,861!
1594
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1595
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1596
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1597
    } else {
1598
      mgmtEpSet->numOfEps++;
19,861✔
1599
    }
1600
  }
1601

1602
  if (mgmtEpSet->numOfEps == 0) {
31,752✔
1603
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1604
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1605
  }
1606

1607
  return 0;
31,748✔
1608
}
1609

1610
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
31,749✔
1611
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1612
  *pTscObj = NULL;
31,749✔
1613
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
31,749✔
1614
  if (TSDB_CODE_SUCCESS != code) {
31,749!
1615
    return code;
×
1616
  }
1617

1618
  SRequestObj* pRequest = NULL;
31,749✔
1619
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
31,749✔
1620
  if (TSDB_CODE_SUCCESS != code) {
31,749!
1621
    destroyTscObj(*pTscObj);
×
1622
    return code;
×
1623
  }
1624

1625
  pRequest->sqlstr = taosStrdup("taos_connect");
31,749!
1626
  if (pRequest->sqlstr) {
31,749!
1627
    pRequest->sqlLen = strlen(pRequest->sqlstr);
31,749✔
1628
  } else {
1629
    return terrno;
×
1630
  }
1631

1632
  SMsgSendInfo* body = NULL;
31,749✔
1633
  code = buildConnectMsg(pRequest, &body);
31,749✔
1634
  if (TSDB_CODE_SUCCESS != code) {
31,749!
1635
    destroyTscObj(*pTscObj);
×
1636
    return code;
×
1637
  }
1638

1639
  // int64_t transporterId = 0;
1640
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
31,749✔
1641
  if (TSDB_CODE_SUCCESS != code) {
31,749!
1642
    destroyTscObj(*pTscObj);
×
1643
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1644
    return code;
×
1645
  }
1646
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
31,749✔
1647
    destroyTscObj(*pTscObj);
1✔
1648
    tscError("failed to wait sem, code:%s", terrstr());
×
1649
    return terrno;
×
1650
  }
1651
  if (pRequest->code != TSDB_CODE_SUCCESS) {
31,747✔
1652
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
42!
1653
    tscError("failed to connect to server, reason: %s", errorMsg);
42!
1654

1655
    terrno = pRequest->code;
42✔
1656
    destroyRequest(pRequest);
42✔
1657
    taos_close_internal(*pTscObj);
42✔
1658
    *pTscObj = NULL;
42✔
1659
    return terrno;
42✔
1660
  } else {
1661
    tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
31,705!
1662
             (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1663
    destroyRequest(pRequest);
31,706✔
1664
  }
1665
  return code;
31,707✔
1666
}
1667

1668
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
31,749✔
1669
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
31,749!
1670
  if (*pMsgSendInfo == NULL) {
31,749!
1671
    return terrno;
×
1672
  }
1673

1674
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
31,749✔
1675

1676
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
31,749✔
1677
  (*pMsgSendInfo)->requestId = pRequest->requestId;
31,749✔
1678
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
31,749✔
1679
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
31,749!
1680
  if (NULL == (*pMsgSendInfo)->param) {
31,749!
1681
    taosMemoryFree(*pMsgSendInfo);
×
1682
    return terrno;
×
1683
  }
1684

1685
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
31,749✔
1686

1687
  SConnectReq connectReq = {0};
31,749✔
1688
  STscObj*    pObj = pRequest->pTscObj;
31,749✔
1689

1690
  char* db = getDbOfConnection(pObj);
31,749✔
1691
  if (db != NULL) {
31,749✔
1692
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,102✔
1693
  } else if (terrno) {
30,647!
1694
    taosMemoryFree(*pMsgSendInfo);
×
1695
    return terrno;
×
1696
  }
1697
  taosMemoryFreeClear(db);
31,749!
1698

1699
  connectReq.connType = pObj->connType;
31,749✔
1700
  connectReq.pid = appInfo.pid;
31,749✔
1701
  connectReq.startTime = appInfo.startTime;
31,749✔
1702

1703
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
31,749✔
1704
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
31,749✔
1705
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
31,749✔
1706
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
31,749✔
1707

1708
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
31,749✔
1709
  void*   pReq = taosMemoryMalloc(contLen);
31,749!
1710
  if (NULL == pReq) {
31,749!
1711
    taosMemoryFree(*pMsgSendInfo);
×
1712
    return terrno;
×
1713
  }
1714

1715
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
31,749!
1716
    taosMemoryFree(*pMsgSendInfo);
×
1717
    taosMemoryFree(pReq);
×
1718
    return terrno;
×
1719
  }
1720

1721
  (*pMsgSendInfo)->msgInfo.len = contLen;
31,749✔
1722
  (*pMsgSendInfo)->msgInfo.pData = pReq;
31,749✔
1723
  return TSDB_CODE_SUCCESS;
31,749✔
1724
}
1725

1726
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
19,381,680✔
1727
  if (NULL == pEpSet) {
19,381,680✔
1728
    return;
18,084,852✔
1729
  }
1730

1731
  switch (pSendInfo->target.type) {
1,296,828✔
1732
    case TARGET_TYPE_MNODE:
37✔
1733
      if (NULL == pTscObj) {
37!
1734
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1735
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1736
        return;
×
1737
      }
1738

1739
      SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
37✔
1740
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
37✔
1741
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
37✔
1742
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
37!
1743
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1744
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
37✔
1745
      break;
37✔
1746
    case TARGET_TYPE_VNODE: {
746,451✔
1747
      if (NULL == pTscObj) {
746,451✔
1748
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
8!
1749
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1750
        return;
8✔
1751
      }
1752

1753
      SCatalog* pCatalog = NULL;
746,443✔
1754
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
746,443✔
1755
      if (code != TSDB_CODE_SUCCESS) {
746,427!
1756
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1757
                 tstrerror(code));
1758
        return;
×
1759
      }
1760

1761
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
746,427✔
1762
      if (code != TSDB_CODE_SUCCESS) {
746,451!
1763
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1764
                 tstrerror(code));
1765
        return;
×
1766
      }
1767
      taosMemoryFreeClear(pSendInfo->target.dbFName);
746,451!
1768
      break;
746,447✔
1769
    }
1770
    default:
550,340✔
1771
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
550,340!
1772
      break;
551,928✔
1773
  }
1774
}
1775

1776
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
19,390,768✔
1777
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
19,390,768✔
1778
  if (pMsg->info.ahandle == NULL) {
19,390,768✔
1779
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
294!
1780
    rpcFreeCont(pMsg->pCont);
294✔
1781
    taosMemoryFree(pEpSet);
294!
1782
    return TSDB_CODE_TSC_INTERNAL_ERROR;
294✔
1783
  }
1784

1785
  STscObj* pTscObj = NULL;
19,390,474✔
1786

1787
  STraceId* trace = &pMsg->info.traceId;
19,390,474✔
1788
  char      tbuf[40] = {0};
19,390,474✔
1789
  TRACE_TO_STR(trace, tbuf);
19,390,474!
1790

1791
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
19,389,994!
1792
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1793

1794
  if (pSendInfo->requestObjRefId != 0) {
19,390,316✔
1795
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
13,100,630✔
1796
    if (pRequest) {
13,095,502✔
1797
      if (pRequest->self != pSendInfo->requestObjRefId) {
13,091,603!
1798
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64,
×
1799
                 pRequest->self, pSendInfo->requestObjRefId);
1800

1801
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1802
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1803
        }
1804
        rpcFreeCont(pMsg->pCont);
×
1805
        taosMemoryFree(pEpSet);
×
1806
        destroySendMsgInfo(pSendInfo);
×
1807
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1808
      }
1809
      pTscObj = pRequest->pTscObj;
13,091,603✔
1810
    }
1811
  }
1812

1813
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
19,385,188✔
1814

1815
  SDataBuf buf = {.msgType = pMsg->msgType,
19,381,110✔
1816
                  .len = pMsg->contLen,
19,381,110✔
1817
                  .pData = NULL,
1818
                  .handle = pMsg->info.handle,
19,381,110✔
1819
                  .handleRefId = pMsg->info.refId,
19,381,110✔
1820
                  .pEpSet = pEpSet};
1821

1822
  if (pMsg->contLen > 0) {
19,381,110✔
1823
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
18,200,764!
1824
    if (buf.pData == NULL) {
18,203,284!
1825
      pMsg->code = terrno;
×
1826
    } else {
1827
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
18,203,284✔
1828
    }
1829
  }
1830

1831
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
19,383,630✔
1832

1833
  if (pTscObj) {
19,371,108✔
1834
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
13,079,299✔
1835
    if (TSDB_CODE_SUCCESS != code) {
13,090,234!
1836
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1837
      terrno = code;
×
1838
      pMsg->code = code;
×
1839
    }
1840
  }
1841

1842
  rpcFreeCont(pMsg->pCont);
19,382,043✔
1843
  destroySendMsgInfo(pSendInfo);
19,388,201✔
1844
  return TSDB_CODE_SUCCESS;
19,389,928✔
1845
}
1846

1847
int32_t doProcessMsgFromServer(void* param) {
19,392,634✔
1848
  AsyncArg* arg = (AsyncArg*)param;
19,392,634✔
1849
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
19,392,634✔
1850
  taosMemoryFree(arg);
19,388,205!
1851
  return code;
19,391,195✔
1852
}
1853

1854
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
19,366,172✔
1855
  int32_t code = 0;
19,366,172✔
1856
  SEpSet* tEpSet = NULL;
19,366,172✔
1857

1858
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
19,366,172✔
1859

1860
  if (pEpSet != NULL) {
19,364,497✔
1861
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
1,298,419!
1862
    if (NULL == tEpSet) {
1,298,413!
1863
      code = terrno;
×
1864
      pMsg->code = terrno;
×
1865
      goto _exit;
×
1866
    }
1867
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
1,298,413✔
1868
  }
1869

1870
  // pMsg is response msg
1871
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
19,364,491✔
1872
    // restore origin code
1873
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
31,748!
1874
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1875
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
31,748!
1876
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1877
    }
1878
  } else {
1879
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1880
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
19,332,743!
1881
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1882
    }
1883
  }
1884

1885
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
19,364,491!
1886
  if (NULL == arg) {
19,369,696!
1887
    code = terrno;
×
1888
    pMsg->code = code;
×
1889
    goto _exit;
×
1890
  }
1891

1892
  arg->msg = *pMsg;
19,369,696✔
1893
  arg->pEpset = tEpSet;
19,369,696✔
1894

1895
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
19,369,696✔
1896
    pMsg->code = code;
1,727✔
1897
    taosMemoryFree(arg);
1,727!
1898
    goto _exit;
×
1899
  }
1900
  return;
19,385,173✔
1901
  
1902
_exit:
×
1903
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
1904
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
1905
  if (code != 0) {
×
1906
    tscError("failed to sched msg to tsc, tsc ready quit");
×
1907
  }
1908
}
1909

1910
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
4✔
1911
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
4!
1912
  if (user == NULL) {
4!
1913
    user = TSDB_DEFAULT_USER;
×
1914
  }
1915

1916
  if (auth == NULL) {
4!
1917
    tscError("No auth info is given, failed to connect to server");
×
1918
    return NULL;
×
1919
  }
1920

1921
  STscObj* pObj = NULL;
4✔
1922
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
4✔
1923
  if (TSDB_CODE_SUCCESS == code) {
4✔
1924
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1!
1925
    if (NULL == rid) {
1!
1926
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
1927
    }
1928
    *rid = pObj->id;
1✔
1929
    return (TAOS*)rid;
1✔
1930
  }
1931

1932
  return NULL;
3✔
1933
}
1934

1935
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
1936
//                      const char* db, int dbLen, uint16_t port) {
1937
//   char ipStr[TSDB_EP_LEN] = {0};
1938
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
1939
//   char userStr[TSDB_USER_LEN] = {0};
1940
//   char passStr[TSDB_PASSWORD_LEN] = {0};
1941
//
1942
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
1943
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
1944
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
1945
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1946
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
1947
// }
1948

1949
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
34,356,357✔
1950
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
166,383,417✔
1951
    SResultColumn* pCol = &pResultInfo->pCol[i];
132,064,748✔
1952

1953
    int32_t type = pResultInfo->fields[i].type;
132,064,748✔
1954
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
132,064,748✔
1955

1956
    if (IS_VAR_DATA_TYPE(type)) {
132,027,060✔
1957
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
25,676,069!
1958
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
24,597,556✔
1959

1960
        pResultInfo->length[i] = varDataLen(pStart);
24,597,556✔
1961
        pResultInfo->row[i] = varDataVal(pStart);
24,597,556✔
1962
      } else {
1963
        pResultInfo->row[i] = NULL;
1,078,513✔
1964
        pResultInfo->length[i] = 0;
1,078,513✔
1965
      }
1966
    } else {
1967
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
106,350,991✔
1968
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
101,958,267✔
1969
        pResultInfo->length[i] = schemaBytes;
101,958,267✔
1970
      } else {
1971
        pResultInfo->row[i] = NULL;
4,392,724✔
1972
        pResultInfo->length[i] = 0;
4,392,724✔
1973
      }
1974
    }
1975
  }
1976
}
34,318,669✔
1977

1978
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
1979
  if (pRequest == NULL) {
×
1980
    return NULL;
×
1981
  }
1982

1983
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
1984
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
1985
    // All data has returned to App already, no need to try again
1986
    if (pResultInfo->completed) {
×
1987
      pResultInfo->numOfRows = 0;
×
1988
      return NULL;
×
1989
    }
1990

1991
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
1992
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
1993

1994
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
1995
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
1996
      pResultInfo->numOfRows = 0;
×
1997
      return NULL;
×
1998
    }
1999

2000
    pRequest->code =
×
2001
        setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4, pRequest->isStmtBind);
×
2002
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2003
      pResultInfo->numOfRows = 0;
×
2004
      return NULL;
×
2005
    }
2006

2007
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
×
2008
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2009

2010
    STscObj*            pTscObj = pRequest->pTscObj;
×
2011
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2012
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2013

2014
    if (pResultInfo->numOfRows == 0) {
×
2015
      return NULL;
×
2016
    }
2017
  }
2018

2019
  if (setupOneRowPtr) {
×
2020
    doSetOneRowPtr(pResultInfo);
×
2021
    pResultInfo->current += 1;
×
2022
  }
2023

2024
  return pResultInfo->row;
×
2025
}
2026

2027
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
943,456✔
2028
  tsem_t* sem = param;
943,456✔
2029
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
943,456!
2030
    tscError("failed to post sem, code:%s", terrstr());
×
2031
  }
2032
}
943,465✔
2033

2034
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
20,018,460✔
2035
  if (pRequest == NULL) {
20,018,460!
2036
    return NULL;
×
2037
  }
2038

2039
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
20,018,460✔
2040
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
20,018,460✔
2041
    // All data has returned to App already, no need to try again
2042
    if (pResultInfo->completed) {
1,548,553✔
2043
      pResultInfo->numOfRows = 0;
605,092✔
2044
      return NULL;
605,092✔
2045
    }
2046

2047
    // convert ucs4 to native multi-bytes string
2048
    pResultInfo->convertUcs4 = convertUcs4;
943,461✔
2049
    tsem_t sem;
2050
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
943,461!
2051
      tscError("failed to init sem, code:%s", terrstr());
×
2052
    }
2053
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
943,454✔
2054
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
943,460!
2055
      tscError("failed to wait sem, code:%s", terrstr());
×
2056
    }
2057
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
943,447!
2058
      tscError("failed to destroy sem, code:%s", terrstr());
×
2059
    }
2060
    pRequest->inCallback = false;
943,436✔
2061
  }
2062

2063
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
19,413,343!
2064
    return NULL;
58,758✔
2065
  } else {
2066
    if (setupOneRowPtr) {
19,354,585✔
2067
      doSetOneRowPtr(pResultInfo);
18,679,922✔
2068
      pResultInfo->current += 1;
18,676,341✔
2069
    }
2070

2071
    return pResultInfo->row;
19,351,004✔
2072
  }
2073
}
2074

2075
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
1,421,267✔
2076
  if (pResInfo->row == NULL) {
1,421,267✔
2077
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,299,436!
2078
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
1,299,480!
2079
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
1,299,473!
2080
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,299,477!
2081

2082
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
1,299,477!
2083
      taosMemoryFree(pResInfo->row);
×
2084
      taosMemoryFree(pResInfo->pCol);
×
2085
      taosMemoryFree(pResInfo->length);
×
2086
      taosMemoryFree(pResInfo->convertBuf);
×
2087
      return terrno;
×
2088
    }
2089
  }
2090

2091
  return TSDB_CODE_SUCCESS;
1,421,308✔
2092
}
2093

2094
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
1,420,572✔
2095
  int32_t idx = -1;
1,420,572✔
2096
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
1,420,572✔
2097
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
1,420,627!
2098

2099
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
5,637,951✔
2100
    int32_t type = pResultInfo->fields[i].type;
4,217,323✔
2101
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
4,217,323✔
2102

2103
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
4,217,312✔
2104
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
178,288!
2105
      if (p == NULL) {
178,287!
2106
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2107
        return terrno;
×
2108
      }
2109

2110
      pResultInfo->convertBuf[i] = p;
178,287✔
2111

2112
      SResultColumn* pCol = &pResultInfo->pCol[i];
178,287✔
2113
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
29,531,716✔
2114
        if (pCol->offset[j] != -1) {
29,353,416✔
2115
          char* pStart = pCol->offset[j] + pCol->pData;
27,757,138✔
2116

2117
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
27,757,138✔
2118
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
27,757,162!
2119
            tscError(
11!
2120
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2121
                "colLength[i]):%p",
2122
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2123
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
11✔
2124
            return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2125
          }
2126

2127
          varDataSetLen(p, len);
27,757,151✔
2128
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
27,757,151✔
2129
          p += (len + VARSTR_HEADER_SIZE);
27,757,151✔
2130
        }
2131
      }
2132

2133
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
178,300✔
2134
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
178,300✔
2135
    }
2136
  }
2137
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
1,420,628✔
2138
  return TSDB_CODE_SUCCESS;
1,420,628✔
2139
}
2140

2141
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
1,420,607✔
2142
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
5,638,137✔
2143
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
4,217,530✔
2144
    TAOS_FIELD* pField = pResultInfo->userFields + i;
4,217,530✔
2145
    int32_t type = pFieldE->type;
4,217,530✔
2146
    int32_t bufLen = 0;
4,217,530✔
2147
    char* p = NULL;
4,217,530✔
2148
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
4,217,530✔
2149
      continue;
4,209,580✔
2150
    } else {
2151
      bufLen = 64;
7,950✔
2152
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
7,950!
2153
      pFieldE->bytes = bufLen;
7,950✔
2154
      pField->bytes = bufLen;
7,950✔
2155
    }
2156
    if (!p) return terrno;
7,950!
2157
    pResultInfo->convertBuf[i] = p;
7,950✔
2158

2159
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
6,463,741✔
2160
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
6,455,791✔
2161
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
6,455,791✔
2162
      p += bufLen;
6,455,791✔
2163
      if (TSDB_CODE_SUCCESS != code) {
6,455,791!
2164
        return code;
×
2165
      }
2166
    }
2167
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
7,950✔
2168
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
7,950✔
2169
  }
2170
  return 0;
1,420,607✔
2171
}
2172

2173
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1,238✔
2174
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
1,238✔
2175
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
2176
}
2177

2178
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
619✔
2179
  char*   p = (char*)pResultInfo->pData;
619✔
2180
  int32_t blockVersion = *(int32_t*)p;
619✔
2181

2182
  int32_t numOfRows = pResultInfo->numOfRows;
619✔
2183
  int32_t numOfCols = pResultInfo->numOfCols;
619✔
2184

2185
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2186
  // length |
2187
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
619✔
2188
  if (numOfCols != cols) {
619!
2189
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2190
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2191
  }
2192

2193
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
619✔
2194
  int32_t* colLength = (int32_t*)(p + len);
619✔
2195
  len += sizeof(int32_t) * numOfCols;
619✔
2196

2197
  char* pStart = p + len;
619✔
2198
  for (int32_t i = 0; i < numOfCols; ++i) {
3,135✔
2199
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,516!
2200

2201
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,516✔
2202
      int32_t* offset = (int32_t*)pStart;
702✔
2203
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
702✔
2204
      len += lenTmp;
702✔
2205
      pStart += lenTmp;
702✔
2206

2207
      int32_t estimateColLen = 0;
702✔
2208
      for (int32_t j = 0; j < numOfRows; ++j) {
3,777✔
2209
        if (offset[j] == -1) {
3,075✔
2210
          continue;
231✔
2211
        }
2212
        char* data = offset[j] + pStart;
2,844✔
2213

2214
        int32_t jsonInnerType = *data;
2,844✔
2215
        char*   jsonInnerData = data + CHAR_BYTES;
2,844✔
2216
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
2,844✔
2217
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
78✔
2218
        } else if (tTagIsJson(data)) {
2,766✔
2219
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
946✔
2220
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,820✔
2221
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
1,482✔
2222
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
338✔
2223
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
246✔
2224
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
92!
2225
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
92✔
2226
        } else {
2227
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2228
          return -1;
×
2229
        }
2230
      }
2231
      len += TMAX(colLen, estimateColLen);
702✔
2232
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,814!
2233
      int32_t lenTmp = numOfRows * sizeof(int32_t);
658✔
2234
      len += (lenTmp + colLen);
658✔
2235
      pStart += lenTmp;
658✔
2236
    } else {
2237
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
1,156✔
2238
      len += (lenTmp + colLen);
1,156✔
2239
      pStart += lenTmp;
1,156✔
2240
    }
2241
    pStart += colLen;
2,516✔
2242
  }
2243

2244
  // Ensure the complete structure of the block, including the blankfill field,
2245
  // even though it is not used on the client side.
2246
  len += sizeof(bool);
619✔
2247
  return len;
619✔
2248
}
2249

2250
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
1,421,304✔
2251
  int32_t numOfRows = pResultInfo->numOfRows;
1,421,304✔
2252
  int32_t numOfCols = pResultInfo->numOfCols;
1,421,304✔
2253
  bool    needConvert = false;
1,421,304✔
2254
  for (int32_t i = 0; i < numOfCols; ++i) {
5,641,560✔
2255
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
4,220,875✔
2256
      needConvert = true;
619✔
2257
      break;
619✔
2258
    }
2259
  }
2260

2261
  if (!needConvert) {
1,421,304✔
2262
    return TSDB_CODE_SUCCESS;
1,420,694✔
2263
  }
2264

2265
  tscDebug("start to convert form json format string");
610✔
2266

2267
  char*   p = (char*)pResultInfo->pData;
610✔
2268
  int32_t blockVersion = *(int32_t*)p;
610✔
2269
  int32_t dataLen = estimateJsonLen(pResultInfo);
610✔
2270
  if (dataLen <= 0) {
619!
2271
    tscError("doConvertJson error: estimateJsonLen failed");
×
2272
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2273
  }
2274

2275
  taosMemoryFreeClear(pResultInfo->convertJson);
619!
2276
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
619!
2277
  if (pResultInfo->convertJson == NULL) return terrno;
619!
2278
  char* p1 = pResultInfo->convertJson;
619✔
2279

2280
  int32_t totalLen = 0;
619✔
2281
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
619✔
2282
  if (numOfCols != cols) {
619!
2283
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2284
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2285
  }
2286

2287
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
619✔
2288
  (void)memcpy(p1, p, len);
619✔
2289

2290
  p += len;
619✔
2291
  p1 += len;
619✔
2292
  totalLen += len;
619✔
2293

2294
  len = sizeof(int32_t) * numOfCols;
619✔
2295
  int32_t* colLength = (int32_t*)p;
619✔
2296
  int32_t* colLength1 = (int32_t*)p1;
619✔
2297
  (void)memcpy(p1, p, len);
619✔
2298
  p += len;
619✔
2299
  p1 += len;
619✔
2300
  totalLen += len;
619✔
2301

2302
  char* pStart = p;
619✔
2303
  char* pStart1 = p1;
619✔
2304
  for (int32_t i = 0; i < numOfCols; ++i) {
3,135✔
2305
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,516!
2306
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
2,516!
2307
    if (colLen >= dataLen) {
2,516!
2308
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2309
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2310
    }
2311
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,516✔
2312
      int32_t* offset = (int32_t*)pStart;
702✔
2313
      int32_t* offset1 = (int32_t*)pStart1;
702✔
2314
      len = numOfRows * sizeof(int32_t);
702✔
2315
      (void)memcpy(pStart1, pStart, len);
702✔
2316
      pStart += len;
702✔
2317
      pStart1 += len;
702✔
2318
      totalLen += len;
702✔
2319

2320
      len = 0;
702✔
2321
      for (int32_t j = 0; j < numOfRows; ++j) {
3,777✔
2322
        if (offset[j] == -1) {
3,075✔
2323
          continue;
231✔
2324
        }
2325
        char* data = offset[j] + pStart;
2,844✔
2326

2327
        int32_t jsonInnerType = *data;
2,844✔
2328
        char*   jsonInnerData = data + CHAR_BYTES;
2,844✔
2329
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
2,844✔
2330
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
2,844✔
2331
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
78✔
2332
          varDataSetLen(dst, strlen(varDataVal(dst)));
78✔
2333
        } else if (tTagIsJson(data)) {
2,766✔
2334
          char* jsonString = NULL;
946✔
2335
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
946✔
2336
          if (jsonString == NULL) {
946!
2337
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2338
            return terrno;
×
2339
          }
2340
          STR_TO_VARSTR(dst, jsonString);
946✔
2341
          taosMemoryFree(jsonString);
946!
2342
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,820✔
2343
          *(char*)varDataVal(dst) = '\"';
1,482✔
2344
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
1,482✔
2345
                                         varDataVal(dst) + CHAR_BYTES, pResultInfo->charsetCxt);
2346
          if (length <= 0) {
1,482✔
2347
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
6!
2348
              pResultInfo->charsetCxt != NULL ? ((SConvInfo *)(pResultInfo->charsetCxt))->charset : tsCharset);
2349
            length = 0;
6✔
2350
          }
2351
          varDataSetLen(dst, length + CHAR_BYTES * 2);
1,482✔
2352
          *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
1,482✔
2353
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
338✔
2354
          double jsonVd = *(double*)(jsonInnerData);
246✔
2355
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
246✔
2356
          varDataSetLen(dst, strlen(varDataVal(dst)));
246✔
2357
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
92!
2358
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
92✔
2359
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
92✔
2360
          varDataSetLen(dst, strlen(varDataVal(dst)));
92✔
2361
        } else {
2362
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2363
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2364
        }
2365

2366
        offset1[j] = len;
2,844✔
2367
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
2,844✔
2368
        len += varDataTLen(dst);
2,844✔
2369
      }
2370
      colLen1 = len;
702✔
2371
      totalLen += colLen1;
702✔
2372
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
702!
2373
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,814!
2374
      len = numOfRows * sizeof(int32_t);
658✔
2375
      (void)memcpy(pStart1, pStart, len);
658✔
2376
      pStart += len;
658✔
2377
      pStart1 += len;
658✔
2378
      totalLen += len;
658✔
2379
      totalLen += colLen;
658✔
2380
      (void)memcpy(pStart1, pStart, colLen);
658✔
2381
    } else {
2382
      len = BitmapLen(pResultInfo->numOfRows);
1,156✔
2383
      (void)memcpy(pStart1, pStart, len);
1,156✔
2384
      pStart += len;
1,156✔
2385
      pStart1 += len;
1,156✔
2386
      totalLen += len;
1,156✔
2387
      totalLen += colLen;
1,156✔
2388
      (void)memcpy(pStart1, pStart, colLen);
1,156✔
2389
    }
2390
    pStart += colLen;
2,516✔
2391
    pStart1 += colLen1;
2,516✔
2392
  }
2393

2394
  // Ensure the complete structure of the block, including the blankfill field,
2395
  // even though it is not used on the client side.
2396
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2397
  totalLen += sizeof(bool);
619✔
2398

2399
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
619✔
2400
  pResultInfo->pData = pResultInfo->convertJson;
619✔
2401
  return TSDB_CODE_SUCCESS;
619✔
2402
}
2403

2404
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
1,485,970✔
2405
  bool convertForDecimal = convertUcs4;
1,485,970✔
2406
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
1,485,970!
2407
    tscError("setResultDataPtr paras error");
×
2408
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2409
  }
2410

2411
  if (pResultInfo->numOfRows == 0) {
1,485,988✔
2412
    return TSDB_CODE_SUCCESS;
64,703✔
2413
  }
2414

2415
  if (pResultInfo->pData == NULL) {
1,421,285!
2416
    tscError("setResultDataPtr error: pData is NULL");
×
2417
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2418
  }
2419

2420
  int32_t code = doPrepareResPtr(pResultInfo);
1,421,285✔
2421
  if (code != TSDB_CODE_SUCCESS) {
1,421,313!
2422
    return code;
×
2423
  }
2424
  code = doConvertJson(pResultInfo);
1,421,313✔
2425
  if (code != TSDB_CODE_SUCCESS) {
1,421,308!
2426
    return code;
×
2427
  }
2428

2429
  char* p = (char*)pResultInfo->pData;
1,421,308✔
2430

2431
  // version:
2432
  int32_t blockVersion = *(int32_t*)p;
1,421,308✔
2433
  p += sizeof(int32_t);
1,421,308✔
2434

2435
  int32_t dataLen = *(int32_t*)p;
1,421,308✔
2436
  p += sizeof(int32_t);
1,421,308✔
2437

2438
  int32_t rows = *(int32_t*)p;
1,421,308✔
2439
  p += sizeof(int32_t);
1,421,308✔
2440

2441
  int32_t cols = *(int32_t*)p;
1,421,308✔
2442
  p += sizeof(int32_t);
1,421,308✔
2443

2444
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
1,421,308!
2445
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
×
2446
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2447
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2448
  }
2449

2450
  int32_t hasColumnSeg = *(int32_t*)p;
1,421,311✔
2451
  p += sizeof(int32_t);
1,421,311✔
2452

2453
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
1,421,311✔
2454
  p += sizeof(uint64_t);
1,421,311✔
2455

2456
  // check fields
2457
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
5,642,203✔
2458
    int8_t type = *(int8_t*)p;
4,220,896✔
2459
    p += sizeof(int8_t);
4,220,896✔
2460

2461
    int32_t bytes = *(int32_t*)p;
4,220,896✔
2462
    p += sizeof(int32_t);
4,220,896✔
2463

2464
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
4,220,896!
2465
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
×
2466
    }
2467
  }
2468

2469
  int32_t* colLength = (int32_t*)p;
1,421,307✔
2470
  p += sizeof(int32_t) * pResultInfo->numOfCols;
1,421,307✔
2471

2472
  char* pStart = p;
1,421,307✔
2473
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
5,641,939✔
2474
    if ((pStart - pResultInfo->pData) >= dataLen) {
4,220,651!
2475
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2476
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2477
    }
2478
    if (blockVersion == BLOCK_VERSION_1) {
4,220,651✔
2479
      colLength[i] = htonl(colLength[i]);
2,380,752✔
2480
    }
2481
    if (colLength[i] >= dataLen) {
4,220,651!
2482
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2483
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2484
    }
2485
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
4,220,651!
2486
      tscError("invalid type %d", pResultInfo->fields[i].type);
10!
2487
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2488
    }
2489
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
4,220,641✔
2490
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
1,059,312✔
2491
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
1,059,312✔
2492
    } else {
2493
      pResultInfo->pCol[i].nullbitmap = pStart;
3,161,329✔
2494
      pStart += BitmapLen(pResultInfo->numOfRows);
3,161,329✔
2495
    }
2496

2497
    pResultInfo->pCol[i].pData = pStart;
4,220,641✔
2498
    pResultInfo->length[i] = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
4,220,641✔
2499
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
4,220,632✔
2500

2501
    pStart += colLength[i];
4,220,632✔
2502
  }
2503

2504
  p = pStart;
1,421,288✔
2505
  // bool blankFill = *(bool*)p;
2506
  p += sizeof(bool);
1,421,288✔
2507
  int32_t offset = p - pResultInfo->pData;
1,421,288✔
2508
  if (offset > dataLen) {
1,421,288!
2509
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2510
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2511
  }
2512

2513
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2514
  if (convertUcs4) {
1,421,288✔
2515
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
1,420,572✔
2516
  }
2517
#endif
2518
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
1,421,344✔
2519
    code = convertDecimalType(pResultInfo);
1,420,628✔
2520
  }
2521
  return code;
1,421,302✔
2522
}
2523

2524
char* getDbOfConnection(STscObj* pObj) {
11,198,762✔
2525
  terrno = TSDB_CODE_SUCCESS;
11,198,762✔
2526
  char* p = NULL;
11,197,042✔
2527
  (void)taosThreadMutexLock(&pObj->mutex);
11,197,042✔
2528
  size_t len = strlen(pObj->db);
11,208,873✔
2529
  if (len > 0) {
11,208,873✔
2530
    p = taosStrndup(pObj->db, tListLen(pObj->db));
10,835,614!
2531
    if (p == NULL) {
10,830,772!
2532
      tscError("failed to taosStrndup db name");
×
2533
    }
2534
  }
2535

2536
  (void)taosThreadMutexUnlock(&pObj->mutex);
11,204,031✔
2537
  return p;
11,211,916✔
2538
}
2539

2540
void setConnectionDB(STscObj* pTscObj, const char* db) {
8,698✔
2541
  if (db == NULL || pTscObj == NULL) {
8,698!
2542
    tscError("setConnectionDB para is NULL");
×
2543
    return;
×
2544
  }
2545

2546
  (void)taosThreadMutexLock(&pTscObj->mutex);
8,699✔
2547
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
8,698✔
2548
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
8,698✔
2549
}
2550

2551
void resetConnectDB(STscObj* pTscObj) {
×
2552
  if (pTscObj == NULL) {
×
2553
    return;
×
2554
  }
2555

2556
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2557
  pTscObj->db[0] = 0;
×
2558
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2559
}
2560

2561
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, bool isStmt) {
1,149,047✔
2562
  if (pResultInfo == NULL || pRsp == NULL) {
1,149,047!
2563
    tscError("setQueryResultFromRsp paras is null");
×
2564
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2565
  }
2566

2567
  taosMemoryFreeClear(pResultInfo->pRspMsg);
1,149,079!
2568
  pResultInfo->pRspMsg = (const char*)pRsp;
1,149,079✔
2569
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
1,149,079✔
2570
  pResultInfo->current = 0;
1,149,058✔
2571
  pResultInfo->completed = (pRsp->completed == 1);
1,149,058✔
2572
  pResultInfo->precision = pRsp->precision;
1,149,058✔
2573

2574
  // decompress data if needed
2575
  int32_t payloadLen = htonl(pRsp->payloadLen);
1,149,058✔
2576

2577
  if (pRsp->compressed) {
1,149,058✔
2578
    if (pResultInfo->decompBuf == NULL) {
649✔
2579
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
14!
2580
      if (pResultInfo->decompBuf == NULL) {
14!
2581
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2582
        return terrno;
×
2583
      }
2584
      pResultInfo->decompBufSize = payloadLen;
14✔
2585
    } else {
2586
      if (pResultInfo->decompBufSize < payloadLen) {
635✔
2587
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
15!
2588
        if (p == NULL) {
15!
2589
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2590
          return terrno;
×
2591
        }
2592

2593
        pResultInfo->decompBuf = p;
15✔
2594
        pResultInfo->decompBufSize = payloadLen;
15✔
2595
      }
2596
    }
2597
  }
2598

2599
  if (payloadLen > 0) {
1,149,058✔
2600
    int32_t compLen = *(int32_t*)pRsp->data;
1,084,347✔
2601
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
1,084,347✔
2602

2603
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
1,084,347✔
2604

2605
    if (pRsp->compressed && compLen < rawLen) {
1,084,347!
2606
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
649✔
2607
      if (len < 0) {
649!
2608
        tscError("tsDecompressString failed");
×
2609
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2610
      }
2611
      if (len != rawLen) {
649!
2612
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2613
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2614
      }
2615
      pResultInfo->pData = pResultInfo->decompBuf;
649✔
2616
      pResultInfo->payloadLen = rawLen;
649✔
2617
    } else {
2618
      pResultInfo->pData = pStart;
1,083,698✔
2619
      pResultInfo->payloadLen = htonl(pRsp->compLen);
1,083,698✔
2620
      if (pRsp->compLen != pRsp->payloadLen) {
1,083,698!
2621
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2622
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2623
      }
2624
    }
2625
  }
2626

2627
  // TODO handle the compressed case
2628
  pResultInfo->totalRows += pResultInfo->numOfRows;
1,149,058✔
2629

2630
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
1,149,058✔
2631
  return code;
1,149,082✔
2632
}
2633

2634
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
5✔
2635
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
5✔
2636
  void*              clientRpc = NULL;
5✔
2637
  SServerStatusRsp   statusRsp = {0};
5✔
2638
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
5✔
2639
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
5✔
2640
  SRpcMsg  rpcRsp = {0};
5✔
2641
  SRpcInit rpcInit = {0};
5✔
2642
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
5✔
2643

2644
  rpcInit.label = "CHK";
5✔
2645
  rpcInit.numOfThreads = 1;
5✔
2646
  rpcInit.cfp = NULL;
5✔
2647
  rpcInit.sessions = 16;
5✔
2648
  rpcInit.connType = TAOS_CONN_CLIENT;
5✔
2649
  rpcInit.idleTime = tsShellActivityTimer * 1000;
5✔
2650
  rpcInit.compressSize = tsCompressMsgSize;
5✔
2651
  rpcInit.user = "_dnd";
5✔
2652

2653
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
5✔
2654
  connLimitNum = TMAX(connLimitNum, 10);
5✔
2655
  connLimitNum = TMIN(connLimitNum, 500);
5✔
2656
  rpcInit.connLimitNum = connLimitNum;
5✔
2657
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
5✔
2658
  rpcInit.readTimeout = tsReadTimeout;
5✔
2659
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
5!
2660
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2661
    goto _OVER;
×
2662
  }
2663

2664
  clientRpc = rpcOpen(&rpcInit);
5✔
2665
  if (clientRpc == NULL) {
5!
2666
    code = terrno;
×
2667
    tscError("failed to init server status client since %s", tstrerror(code));
×
2668
    goto _OVER;
×
2669
  }
2670

2671
  if (fqdn == NULL) {
5!
2672
    fqdn = tsLocalFqdn;
5✔
2673
  }
2674

2675
  if (port == 0) {
5!
2676
    port = tsServerPort;
5✔
2677
  }
2678

2679
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
5✔
2680
  epSet.eps[0].port = (uint16_t)port;
5✔
2681
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
5✔
2682
  if (TSDB_CODE_SUCCESS != ret) {
5!
2683
    tscError("failed to send recv since %s", tstrerror(ret));
×
2684
    goto _OVER;
×
2685
  }
2686

2687
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
5!
2688
    tscError("failed to send server status req since %s", terrstr());
1!
2689
    goto _OVER;
1✔
2690
  }
2691

2692
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
4!
2693
    tscError("failed to parse server status rsp since %s", terrstr());
×
2694
    goto _OVER;
×
2695
  }
2696

2697
  code = statusRsp.statusCode;
4✔
2698
  if (details != NULL) {
4!
2699
    tstrncpy(details, statusRsp.details, maxlen);
4✔
2700
  }
2701

2702
_OVER:
×
2703
  if (clientRpc != NULL) {
5!
2704
    rpcClose(clientRpc);
5✔
2705
  }
2706
  if (rpcRsp.pCont != NULL) {
5✔
2707
    rpcFreeCont(rpcRsp.pCont);
4✔
2708
  }
2709
  return code;
5✔
2710
}
2711

2712
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
4✔
2713
                      int32_t acctId, char* db) {
2714
  SName name = {0};
4✔
2715

2716
  if (len1 <= 0) {
4!
2717
    return -1;
×
2718
  }
2719

2720
  const char* dbName = db;
4✔
2721
  const char* tbName = NULL;
4✔
2722
  int32_t     dbLen = 0;
4✔
2723
  int32_t     tbLen = 0;
4✔
2724
  if (len2 > 0) {
4!
2725
    dbName = str + pos1;
×
2726
    dbLen = len1;
×
2727
    tbName = str + pos2;
×
2728
    tbLen = len2;
×
2729
  } else {
2730
    dbLen = strlen(db);
4✔
2731
    tbName = str + pos1;
4✔
2732
    tbLen = len1;
4✔
2733
  }
2734

2735
  if (dbLen <= 0 || tbLen <= 0) {
4!
2736
    return -1;
×
2737
  }
2738

2739
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
4!
2740
    return -1;
×
2741
  }
2742

2743
  if (tNameAddTbName(&name, tbName, tbLen)) {
4!
2744
    return -1;
×
2745
  }
2746

2747
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
4✔
2748
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
4✔
2749

2750
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
4✔
2751
  if (pDb) {
4!
2752
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2753
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2754
    }
2755
  } else {
2756
    STablesReq db;
2757
    db.pTables = taosArrayInit(20, sizeof(SName));
4✔
2758
    if (NULL == db.pTables) {
4!
2759
      return terrno;
×
2760
    }
2761
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
4✔
2762
    if (NULL == taosArrayPush(db.pTables, &name)) {
8!
2763
      return terrno;
×
2764
    }
2765
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
4!
2766
  }
2767

2768
  return TSDB_CODE_SUCCESS;
4✔
2769
}
2770

2771
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
4✔
2772
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
4✔
2773
  if (NULL == pHash) {
4!
2774
    return terrno;
×
2775
  }
2776

2777
  bool    inEscape = false;
4✔
2778
  int32_t code = 0;
4✔
2779
  void*   pIter = NULL;
4✔
2780

2781
  int32_t vIdx = 0;
4✔
2782
  int32_t vPos[2];
2783
  int32_t vLen[2];
2784

2785
  (void)memset(vPos, -1, sizeof(vPos));
4✔
2786
  (void)memset(vLen, 0, sizeof(vLen));
4✔
2787

2788
  for (int32_t i = 0;; ++i) {
18✔
2789
    if (0 == *(tbList + i)) {
18✔
2790
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
4!
2791
        vLen[vIdx] = i - vPos[vIdx];
4✔
2792
      }
2793

2794
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
4✔
2795
      if (code) {
4!
2796
        goto _return;
×
2797
      }
2798

2799
      break;
4✔
2800
    }
2801

2802
    if ('`' == *(tbList + i)) {
14!
2803
      inEscape = !inEscape;
×
2804
      if (!inEscape) {
×
2805
        if (vPos[vIdx] >= 0) {
×
2806
          vLen[vIdx] = i - vPos[vIdx];
×
2807
        } else {
2808
          goto _return;
×
2809
        }
2810
      }
2811

2812
      continue;
×
2813
    }
2814

2815
    if (inEscape) {
14!
2816
      if (vPos[vIdx] < 0) {
×
2817
        vPos[vIdx] = i;
×
2818
      }
2819
      continue;
×
2820
    }
2821

2822
    if ('.' == *(tbList + i)) {
14!
2823
      if (vPos[vIdx] < 0) {
×
2824
        goto _return;
×
2825
      }
2826
      if (vLen[vIdx] <= 0) {
×
2827
        vLen[vIdx] = i - vPos[vIdx];
×
2828
      }
2829
      vIdx++;
×
2830
      if (vIdx >= 2) {
×
2831
        goto _return;
×
2832
      }
2833
      continue;
×
2834
    }
2835

2836
    if (',' == *(tbList + i)) {
14!
2837
      if (vPos[vIdx] < 0) {
×
2838
        goto _return;
×
2839
      }
2840
      if (vLen[vIdx] <= 0) {
×
2841
        vLen[vIdx] = i - vPos[vIdx];
×
2842
      }
2843

2844
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
2845
      if (code) {
×
2846
        goto _return;
×
2847
      }
2848

2849
      (void)memset(vPos, -1, sizeof(vPos));
×
2850
      (void)memset(vLen, 0, sizeof(vLen));
×
2851
      vIdx = 0;
×
2852
      continue;
×
2853
    }
2854

2855
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
14!
2856
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
2857
        vLen[vIdx] = i - vPos[vIdx];
×
2858
      }
2859
      continue;
×
2860
    }
2861

2862
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
14!
2863
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
1!
2864
      if (vLen[vIdx] > 0) {
14!
2865
        goto _return;
×
2866
      }
2867
      if (vPos[vIdx] < 0) {
14✔
2868
        vPos[vIdx] = i;
4✔
2869
      }
2870
      continue;
14✔
2871
    }
2872

2873
    goto _return;
×
2874
  }
2875

2876
  int32_t dbNum = taosHashGetSize(pHash);
4✔
2877
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
4✔
2878
  if (NULL == pReq) {
4!
2879
    TSC_ERR_JRET(terrno);
×
2880
  }
2881
  pIter = taosHashIterate(pHash, NULL);
4✔
2882
  while (pIter) {
8✔
2883
    STablesReq* pDb = (STablesReq*)pIter;
4✔
2884
    if (NULL == taosArrayPush(*pReq, pDb)) {
8!
2885
      TSC_ERR_JRET(terrno);
×
2886
    }
2887
    pIter = taosHashIterate(pHash, pIter);
4✔
2888
  }
2889

2890
  taosHashCleanup(pHash);
4✔
2891

2892
  return TSDB_CODE_SUCCESS;
4✔
2893

2894
_return:
×
2895

2896
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
2897

2898
  pIter = taosHashIterate(pHash, NULL);
×
2899
  while (pIter) {
×
2900
    STablesReq* pDb = (STablesReq*)pIter;
×
2901
    taosArrayDestroy(pDb->pTables);
×
2902
    pIter = taosHashIterate(pHash, pIter);
×
2903
  }
2904

2905
  taosHashCleanup(pHash);
×
2906

2907
  return terrno;
×
2908
}
2909

2910
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
4✔
2911
  SSyncQueryParam* pParam = param;
4✔
2912
  pParam->pRequest->code = code;
4✔
2913

2914
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
4!
2915
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2916
  }
2917
}
4✔
2918

2919
void syncQueryFn(void* param, void* res, int32_t code) {
11,094,911✔
2920
  SSyncQueryParam* pParam = param;
11,094,911✔
2921
  pParam->pRequest = res;
11,094,911✔
2922

2923
  if (pParam->pRequest) {
11,094,911✔
2924
    pParam->pRequest->code = code;
11,093,304✔
2925
    clientOperateReport(pParam->pRequest);
11,093,304✔
2926
  }
2927

2928
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
11,095,374!
2929
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2930
  }
2931
}
11,103,087✔
2932

2933
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
11,091,794✔
2934
                        int8_t source) {
2935
  if (sql == NULL || NULL == fp) {
11,091,794!
2936
    terrno = TSDB_CODE_INVALID_PARA;
×
2937
    if (fp) {
×
2938
      fp(param, NULL, terrno);
×
2939
    }
2940

2941
    return;
×
2942
  }
2943

2944
  size_t sqlLen = strlen(sql);
11,100,771✔
2945
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
11,100,771!
2946
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, TSDB_MAX_ALLOWED_SQL_LEN);
×
2947
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
2948
    fp(param, NULL, terrno);
×
2949
    return;
×
2950
  }
2951

2952
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
11,100,771✔
2953

2954
  SRequestObj* pRequest = NULL;
11,100,771✔
2955
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
11,100,771✔
2956
  if (code != TSDB_CODE_SUCCESS) {
11,093,835!
2957
    terrno = code;
×
2958
    fp(param, NULL, terrno);
×
2959
    return;
×
2960
  }
2961

2962
  pRequest->source = source;
11,093,835✔
2963
  pRequest->body.queryFp = fp;
11,093,835✔
2964
  doAsyncQuery(pRequest, false);
11,093,835✔
2965
}
2966

2967
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
1✔
2968
                                 int64_t reqid) {
2969
  if (sql == NULL || NULL == fp) {
1!
2970
    terrno = TSDB_CODE_INVALID_PARA;
×
2971
    if (fp) {
×
2972
      fp(param, NULL, terrno);
×
2973
    }
2974

2975
    return;
×
2976
  }
2977

2978
  size_t sqlLen = strlen(sql);
1✔
2979
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
1!
2980
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid,
×
2981
             TSDB_MAX_ALLOWED_SQL_LEN);
2982
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
2983
    fp(param, NULL, terrno);
×
2984
    return;
×
2985
  }
2986

2987
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
1!
2988

2989
  SRequestObj* pRequest = NULL;
1✔
2990
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
1✔
2991
  if (code != TSDB_CODE_SUCCESS) {
1!
2992
    terrno = code;
×
2993
    fp(param, NULL, terrno);
×
2994
    return;
×
2995
  }
2996

2997
  pRequest->body.queryFp = fp;
1✔
2998
  doAsyncQuery(pRequest, false);
1✔
2999
}
3000

3001
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
11,086,474✔
3002
  if (NULL == taos) {
11,086,474!
3003
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3004
    return NULL;
×
3005
  }
3006

3007
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
11,086,474!
3008
  if (NULL == param) {
11,097,799!
3009
    return NULL;
×
3010
  }
3011
  int32_t code = tsem_init(&param->sem, 0, 0);
11,097,799✔
3012
  if (TSDB_CODE_SUCCESS != code) {
11,097,671!
3013
    taosMemoryFree(param);
×
3014
    return NULL;
×
3015
  }
3016

3017
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
11,097,671✔
3018
  code = tsem_wait(&param->sem);
11,051,751✔
3019
  if (TSDB_CODE_SUCCESS != code) {
11,100,301!
3020
    taosMemoryFree(param);
×
3021
    return NULL;
×
3022
  }
3023
  code = tsem_destroy(&param->sem);
11,100,301✔
3024
  if (TSDB_CODE_SUCCESS != code) {
11,097,075!
3025
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3026
  }
3027

3028
  SRequestObj* pRequest = NULL;
11,096,182✔
3029
  if (param->pRequest != NULL) {
11,096,182!
3030
    param->pRequest->syncQuery = true;
11,096,182✔
3031
    pRequest = param->pRequest;
11,096,182✔
3032
    param->pRequest->inCallback = false;
11,096,182✔
3033
  }
3034
  taosMemoryFree(param);
11,096,182!
3035

3036
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3037
  //          *(int64_t*)taos, pRequest);
3038

3039
  return pRequest;
11,088,808✔
3040
}
3041

3042
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
1✔
3043
  if (NULL == taos) {
1!
3044
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3045
    return NULL;
×
3046
  }
3047

3048
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
1!
3049
  if (param == NULL) {
1!
3050
    return NULL;
×
3051
  }
3052
  int32_t code = tsem_init(&param->sem, 0, 0);
1✔
3053
  if (TSDB_CODE_SUCCESS != code) {
1!
3054
    taosMemoryFree(param);
×
3055
    return NULL;
×
3056
  }
3057

3058
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
1✔
3059
  code = tsem_wait(&param->sem);
1✔
3060
  if (TSDB_CODE_SUCCESS != code) {
1!
3061
    taosMemoryFree(param);
×
3062
    return NULL;
×
3063
  }
3064
  SRequestObj* pRequest = NULL;
1✔
3065
  if (param->pRequest != NULL) {
1!
3066
    param->pRequest->syncQuery = true;
1✔
3067
    pRequest = param->pRequest;
1✔
3068
  }
3069
  taosMemoryFree(param);
1!
3070

3071
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3072
  //   *(int64_t*)taos, pRequest);
3073

3074
  return pRequest;
1✔
3075
}
3076

3077
static void fetchCallback(void* pResult, void* param, int32_t code) {
1,031,925✔
3078
  SRequestObj* pRequest = (SRequestObj*)param;
1,031,925✔
3079

3080
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,031,925✔
3081

3082
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
1,031,925✔
3083
           pRequest->requestId);
3084

3085
  pResultInfo->pData = pResult;
1,031,908✔
3086
  pResultInfo->numOfRows = 0;
1,031,908✔
3087

3088
  if (code != TSDB_CODE_SUCCESS) {
1,031,908!
3089
    pRequest->code = code;
×
3090
    taosMemoryFreeClear(pResultInfo->pData);
×
3091
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3092
    return;
×
3093
  }
3094

3095
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,031,908!
3096
    taosMemoryFreeClear(pResultInfo->pData);
×
3097
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3098
    return;
×
3099
  }
3100

3101
  pRequest->code =
1,031,942✔
3102
      setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, pRequest->isStmtBind);
1,031,908✔
3103
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,031,942!
3104
    pResultInfo->numOfRows = 0;
×
3105
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(pRequest->code),
×
3106
             pRequest->requestId);
3107
  } else {
3108
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
1,031,942✔
3109
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
3110
             pRequest->requestId);
3111

3112
    STscObj*            pTscObj = pRequest->pTscObj;
1,031,942✔
3113
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,031,942✔
3114
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
1,031,942✔
3115
  }
3116

3117
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
1,031,961✔
3118
}
3119

3120
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
1,112,398✔
3121
  pRequest->body.fetchFp = fp;
1,112,398✔
3122
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
1,112,398✔
3123

3124
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,112,398✔
3125

3126
  // this query has no results or error exists, return directly
3127
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,112,398!
3128
    pResultInfo->numOfRows = 0;
1✔
3129
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
1✔
3130
    return;
80,429✔
3131
  }
3132

3133
  // all data has returned to App already, no need to try again
3134
  if (pResultInfo->completed) {
1,112,394✔
3135
    // it is a local executed query, no need to do async fetch
3136
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
80,428✔
3137
      if (pResultInfo->localResultFetched) {
2,664✔
3138
        pResultInfo->numOfRows = 0;
1,332✔
3139
        pResultInfo->current = 0;
1,332✔
3140
      } else {
3141
        pResultInfo->localResultFetched = true;
1,332✔
3142
      }
3143
    } else {
3144
      pResultInfo->numOfRows = 0;
77,764✔
3145
    }
3146

3147
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
80,428✔
3148
    return;
80,428✔
3149
  }
3150

3151
  SSchedulerReq req = {
1,031,966✔
3152
      .syncReq = false,
3153
      .fetchFp = fetchCallback,
3154
      .cbParam = pRequest,
3155
  };
3156

3157
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
1,031,966✔
3158
  if (TSDB_CODE_SUCCESS != code) {
1,031,963!
3159
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3160
    // pRequest->body.fetchFp(param, pRequest, code);
3161
  }
3162
}
3163

3164
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
11,093,889✔
3165
  pRequest->inCallback = true;
11,093,889✔
3166
  int64_t this = pRequest->self;
11,093,889✔
3167
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
11,093,889!
3168
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
316!
3169
    code = TSDB_CODE_SUCCESS;
×
3170
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3171
  }
3172

3173
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
11,093,889✔
3174
           pRequest);
3175

3176
  if (pRequest->body.queryFp != NULL) {
11,093,890!
3177
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
11,094,152✔
3178
  }
3179

3180
  SRequestObj* pReq = acquireRequest(this);
11,102,267✔
3181
  if (pReq != NULL) {
11,101,264✔
3182
    pReq->inCallback = false;
11,098,007✔
3183
    (void)releaseRequest(this);
11,098,007✔
3184
  }
3185
}
11,095,727✔
3186

3187
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
936✔
3188
                       SParseSqlRes* pRes) {
3189
#ifndef TD_ENTERPRISE
3190
  return TSDB_CODE_SUCCESS;
3191
#else
3192
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
936✔
3193
#endif
3194
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc