• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4103

17 May 2025 02:18AM UTC coverage: 63.264% (+0.4%) from 62.905%
#4103

push

travis-ci

web-flow
Merge pull request #31110 from taosdata/3.0

merge 3.0

158149 of 318142 branches covered (49.71%)

Branch coverage included in aggregate %.

3 of 5 new or added lines in 1 file covered. (60.0%)

1725 existing lines in 138 files now uncovered.

243642 of 316962 relevant lines covered (76.87%)

16346281.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.94
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "scheduler.h"
22
#include "tdatablock.h"
23
#include "tdataformat.h"
24
#include "tdef.h"
25
#include "tglobal.h"
26
#include "tmsgtype.h"
27
#include "tpagedbuf.h"
28
#include "tref.h"
29
#include "tsched.h"
30
#include "tversion.h"
31
#include "decimal.h"
32

33
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
34
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
35

36
void setQueryRequest(int64_t rId) {
1,637,134✔
37
  SRequestObj* pReq = acquireRequest(rId);
1,637,134✔
38
  if (pReq != NULL) {
1,637,142✔
39
    pReq->isQuery = true;
1,637,125✔
40
    (void)releaseRequest(rId);
1,637,125✔
41
  }
42
}
1,637,128✔
43

44
static bool stringLengthCheck(const char* str, size_t maxsize) {
67,641✔
45
  if (str == NULL) {
67,641!
46
    return false;
×
47
  }
48

49
  size_t len = strlen(str);
67,641✔
50
  if (len <= 0 || len > maxsize) {
67,641!
51
    return false;
×
52
  }
53

54
  return true;
67,707✔
55
}
56

57
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
33,091✔
58

59
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
33,085✔
60

61
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,547✔
62

63
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
33,181✔
64
  char key[512] = {0};
33,181✔
65
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
33,181✔
66
  return taosStrdup(key);
33,181!
67
}
68

69
bool chkRequestKilled(void* param) {
36,556,911✔
70
  bool         killed = false;
36,556,911✔
71
  SRequestObj* pRequest = acquireRequest((int64_t)param);
36,556,911✔
72
  if (NULL == pRequest || pRequest->killed) {
37,108,562!
73
    killed = true;
1✔
74
  }
75

76
  (void)releaseRequest((int64_t)param);
37,108,562✔
77

78
  return killed;
37,021,518✔
79
}
80

81
void cleanupAppInfo() {
18,131✔
82
  taosHashCleanup(appInfo.pInstMap);
18,131✔
83
  taosHashCleanup(appInfo.pInstMapByClusterId);
18,131✔
84
  tscInfo("cluster instance map cleaned");
18,131!
85
}
18,131✔
86

87
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
88
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
89

90
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
33,059✔
91
                              uint16_t port, int connType, STscObj** pObj) {
92
  TSC_ERR_RET(taos_init());
33,059!
93
  if (!validateUserName(user)) {
33,122!
UNCOV
94
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
95
  }
96

97
  char localDb[TSDB_DB_NAME_LEN] = {0};
33,098✔
98
  if (db != NULL && strlen(db) > 0) {
33,098✔
99
    if (!validateDbName(db)) {
1,547!
UNCOV
100
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
101
    }
102

103
    tstrncpy(localDb, db, sizeof(localDb));
1,547✔
104
    (void)strdequote(localDb);
1,547✔
105
  }
106

107
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
33,095✔
108
  if (auth == NULL) {
33,095✔
109
    if (!validatePassword(pass)) {
33,091!
UNCOV
110
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
111
    }
112

113
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
33,110✔
114
  } else {
115
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
4✔
116
  }
117

118
  SCorEpSet epSet = {0};
33,068✔
119
  if (ip) {
33,068✔
120
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
11,265✔
121
  } else {
122
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
21,803!
123
  }
124

125
  if (port) {
33,182✔
126
    epSet.epSet.eps[0].port = port;
681✔
127
    epSet.epSet.eps[1].port = port;
681✔
128
  }
129

130
  char* key = getClusterKey(user, secretEncrypt, ip, port);
33,182✔
131
  if (NULL == key) {
33,185!
UNCOV
132
    TSC_ERR_RET(terrno);
×
133
  }
134
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
33,185✔
135
          user, db, key);
136
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
88,171✔
137
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
54,983!
138
  }
139

140
  SAppInstInfo** pInst = NULL;
33,188✔
141
  int32_t        code = taosThreadMutexLock(&appInfo.mutex);
33,188✔
142
  if (TSDB_CODE_SUCCESS != code) {
33,188!
143
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
UNCOV
144
    TSC_ERR_RET(code);
×
145
  }
146

147
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
33,188✔
148
  SAppInstInfo* p = NULL;
33,188✔
149
  if (pInst == NULL) {
33,188✔
150
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
18,449!
151
    if (NULL == p) {
18,449!
UNCOV
152
      TSC_ERR_JRET(terrno);
×
153
    }
154
    p->mgmtEp = epSet;
18,449✔
155
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
18,449✔
156
    if (TSDB_CODE_SUCCESS != code) {
18,449!
157
      taosMemoryFree(p);
×
UNCOV
158
      TSC_ERR_JRET(code);
×
159
    }
160
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
18,449✔
161
    if (TSDB_CODE_SUCCESS != code) {
18,449!
162
      taosMemoryFree(p);
×
UNCOV
163
      TSC_ERR_JRET(code);
×
164
    }
165
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
18,449✔
166
    if (TSDB_CODE_SUCCESS != code) {
18,449!
167
      destroyAppInst(&p);
×
UNCOV
168
      TSC_ERR_JRET(code);
×
169
    }
170
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
18,449✔
171
    if (TSDB_CODE_SUCCESS != code) {
18,449!
172
      destroyAppInst(&p);
×
UNCOV
173
      TSC_ERR_JRET(code);
×
174
    }
175
    p->instKey = key;
18,449✔
176
    key = NULL;
18,449✔
177
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
18,449!
178

179
    pInst = &p;
18,449✔
180
  } else {
181
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
14,739!
182
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
UNCOV
183
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
184
    }
185
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
186
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
14,739✔
187
  }
188

189
_return:
33,188✔
190

191
  if (TSDB_CODE_SUCCESS != code) {
33,188!
192
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
193
    taosMemoryFreeClear(key);
×
UNCOV
194
    return code;
×
195
  } else {
196
    code = taosThreadMutexUnlock(&appInfo.mutex);
33,188✔
197
    taosMemoryFreeClear(key);
33,188!
198
    if (TSDB_CODE_SUCCESS != code) {
33,188!
199
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
UNCOV
200
      return code;
×
201
    }
202
    return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
33,188✔
203
  }
204
}
205

206
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
207
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
208
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
209
//     return *ppAppInstInfo;
210
//   } else {
211
//     return NULL;
212
//   }
213
// }
214

215
void freeQueryParam(SSyncQueryParam* param) {
1,204✔
216
  if (param == NULL) return;
1,204!
217
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
1,204!
UNCOV
218
    tscError("failed to destroy semaphore in freeQueryParam");
×
219
  }
220
  taosMemoryFree(param);
1,204!
221
}
222

223
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
11,247,934✔
224
                     SRequestObj** pRequest, int64_t reqid) {
225
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
11,247,934✔
226
  if (TSDB_CODE_SUCCESS != code) {
11,262,932!
227
    tscError("failed to malloc sqlObj, %s", sql);
×
UNCOV
228
    return code;
×
229
  }
230

231
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
11,262,932!
232
  if ((*pRequest)->sqlstr == NULL) {
11,246,596!
233
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
234
    destroyRequest(*pRequest);
×
235
    *pRequest = NULL;
×
UNCOV
236
    return terrno;
×
237
  }
238

239
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
11,246,596✔
240
  (*pRequest)->sqlstr[sqlLen] = 0;
11,263,518✔
241
  (*pRequest)->sqlLen = sqlLen;
11,263,518✔
242
  (*pRequest)->validateOnly = validateSql;
11,263,518✔
243
  (*pRequest)->isStmtBind = false;
11,263,518✔
244

245
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
11,263,518✔
246

247
  STscObj* pTscObj = (*pRequest)->pTscObj;
11,263,518✔
248
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
11,263,518✔
249
                             sizeof((*pRequest)->self));
250
  if (err) {
11,244,275!
UNCOV
251
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
252
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
253
    destroyRequest(*pRequest);
×
254
    *pRequest = NULL;
×
UNCOV
255
    return terrno;
×
256
  }
257

258
  (*pRequest)->allocatorRefId = -1;
11,244,275✔
259
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
11,244,275!
260
    if (TSDB_CODE_SUCCESS !=
1,482,875!
261
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
1,482,849✔
UNCOV
262
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self,
×
263
               (*pRequest)->requestId, pTscObj->id, sql);
264
      destroyRequest(*pRequest);
×
265
      *pRequest = NULL;
×
UNCOV
266
      return terrno;
×
267
    }
268
  }
269

270
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
11,255,456✔
271
  return TSDB_CODE_SUCCESS;
11,253,532✔
272
}
273

274
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
461✔
275
  int32_t code =
276
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
461✔
277
  if (TSDB_CODE_SUCCESS == code) {
461!
278
    pRequest->relation.prevRefId = (*pNewRequest)->self;
461✔
279
    (*pNewRequest)->relation.nextRefId = pRequest->self;
461✔
280
    (*pNewRequest)->relation.userRefId = pRequest->self;
461✔
281
    (*pNewRequest)->isSubReq = true;
461✔
282
  }
283
  return code;
461✔
284
}
285

286
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
20,904✔
287
  STscObj* pTscObj = pRequest->pTscObj;
20,904✔
288

289
  SParseContext cxt = {.requestId = pRequest->requestId,
20,904✔
290
                       .requestRid = pRequest->self,
20,904✔
291
                       .acctId = pTscObj->acctId,
20,904✔
292
                       .db = pRequest->pDb,
20,904✔
293
                       .topicQuery = topicQuery,
294
                       .pSql = pRequest->sqlstr,
20,904✔
295
                       .sqlLen = pRequest->sqlLen,
20,904✔
296
                       .pMsg = pRequest->msgBuf,
20,904✔
297
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
298
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
20,904✔
299
                       .pStmtCb = pStmtCb,
300
                       .pUser = pTscObj->user,
20,904✔
301
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
20,904✔
302
                       .enableSysInfo = pTscObj->sysInfo,
20,904✔
303
                       .svrVer = pTscObj->sVer,
20,904✔
304
                       .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
20,904✔
305
                       .isStmtBind = pRequest->isStmtBind,
20,904✔
306
                       .setQueryFp = setQueryRequest,
307
                       .timezone = pTscObj->optionInfo.timezone,
20,904✔
308
                       .charsetCxt = pTscObj->optionInfo.charsetCxt,};
20,904✔
309

310
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
20,904✔
311
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
20,949✔
312
  if (code != TSDB_CODE_SUCCESS) {
20,935!
UNCOV
313
    return code;
×
314
  }
315

316
  code = qParseSql(&cxt, pQuery);
20,935✔
317
  if (TSDB_CODE_SUCCESS == code) {
20,924✔
318
    if ((*pQuery)->haveResultSet) {
20,902!
319
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols, (*pQuery)->pResExtSchema, pRequest->isStmtBind);
×
UNCOV
320
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
321
    }
322
  }
323

324
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
20,927!
325
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
20,904✔
326
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
20,904✔
327
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
20,904✔
328
  }
329

330
  taosArrayDestroy(cxt.pTableMetaPos);
20,927✔
331
  taosArrayDestroy(cxt.pTableVgroupPos);
20,934✔
332

333
  return code;
20,935✔
334
}
335

336
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
337
  SRetrieveTableRsp* pRsp = NULL;
×
338
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
339
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode, pRequest->pTscObj->optionInfo.charsetCxt);
×
340
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
UNCOV
341
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
×
342
  }
343

UNCOV
344
  return code;
×
345
}
346

347
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
1,147✔
348
  // drop table if exists not_exists_table
349
  if (NULL == pQuery->pCmdMsg) {
1,147!
UNCOV
350
    return TSDB_CODE_SUCCESS;
×
351
  }
352

353
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
1,147✔
354
  pRequest->type = pMsgInfo->msgType;
1,147✔
355
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
1,147✔
356
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
1,147✔
357

358
  STscObj*      pTscObj = pRequest->pTscObj;
1,147✔
359
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
1,147✔
360

361
  // int64_t transporterId = 0;
362
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
1,146!
363
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
1,150!
364
  return TSDB_CODE_SUCCESS;
1,150✔
365
}
366

367
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
21,581,429✔
368

369
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
134,101✔
370
  SRetrieveTableRsp* pRsp = NULL;
134,101✔
371
  if (pRequest->validateOnly) {
134,101✔
372
    doRequestCallback(pRequest, 0);
27✔
373
    return;
27✔
374
  }
375

376
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
134,074✔
377
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
134,074✔
378
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
134,074✔
379
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
123,622✔
380
  }
381

382
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
134,071✔
383
  pRequest->code = code;
134,071✔
384

385
  if (pRequest->code != TSDB_CODE_SUCCESS) {
134,071✔
386
    pResultInfo->numOfRows = 0;
14✔
387
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
14!
388
             pRequest->requestId);
389
  } else {
390
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
134,057✔
391
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
392
             pRequest->requestId);
393
  }
394

395
  doRequestCallback(pRequest, code);
134,072✔
396
}
397

398
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
43,821✔
399
  if (pRequest->validateOnly) {
43,821!
400
    doRequestCallback(pRequest, 0);
×
UNCOV
401
    return TSDB_CODE_SUCCESS;
×
402
  }
403

404
  // drop table if exists not_exists_table
405
  if (NULL == pQuery->pCmdMsg) {
43,821✔
406
    doRequestCallback(pRequest, 0);
1✔
407
    return TSDB_CODE_SUCCESS;
1✔
408
  }
409

410
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
43,820✔
411
  pRequest->type = pMsgInfo->msgType;
43,820✔
412
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
43,820✔
413
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
43,820✔
414

415
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
43,820✔
416
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
43,803✔
417

418
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
43,816✔
419
  if (code) {
43,851!
UNCOV
420
    doRequestCallback(pRequest, code);
×
421
  }
422
  return code;
43,851✔
423
}
424

425
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
191,559✔
426
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
191,559✔
427
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
191,559✔
428

429
  if (node1->load < node2->load) {
191,559!
UNCOV
430
    return -1;
×
431
  }
432

433
  return node1->load > node2->load;
191,559✔
434
}
435

436
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
64,193✔
437
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
64,193!
438
  if (pInfo->pQnodeList) {
64,193✔
439
    taosArrayDestroy(pInfo->pQnodeList);
63,544✔
440
    pInfo->pQnodeList = NULL;
63,544✔
441
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
63,544✔
442
  }
443

444
  if (pNodeList) {
64,193!
445
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
64,193✔
446
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
64,193✔
447
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
64,193✔
448
             taosArrayGetSize(pInfo->pQnodeList));
449
  }
450
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
64,193!
451

452
  return TSDB_CODE_SUCCESS;
64,193✔
453
}
454

455
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
11,223,901✔
456
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
11,223,901✔
457
    *required = false;
10,567,768✔
458
    return TSDB_CODE_SUCCESS;
10,567,768✔
459
  }
460

461
  int32_t       code = TSDB_CODE_SUCCESS;
656,133✔
462
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
656,133✔
463
  *required = false;
656,133✔
464

465
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
656,133!
466
  *required = (NULL == pInfo->pQnodeList);
656,133✔
467
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
656,133!
468
  return TSDB_CODE_SUCCESS;
656,133✔
469
}
470

471
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
472
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
UNCOV
473
  int32_t       code = 0;
×
474

475
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
476
  if (pInfo->pQnodeList) {
×
UNCOV
477
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
478
  }
479
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
480
  if (NULL == *pNodeList) {
×
481
    SCatalog* pCatalog = NULL;
×
482
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
483
    if (TSDB_CODE_SUCCESS == code) {
×
484
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
485
      if (NULL == pNodeList) {
×
UNCOV
486
        TSC_ERR_RET(terrno);
×
487
      }
488
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
489
                               .requestId = pRequest->requestId,
×
490
                               .requestObjRefId = pRequest->self,
×
491
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
UNCOV
492
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
493
    }
494

495
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
UNCOV
496
      code = updateQnodeList(pInfo, *pNodeList);
×
497
    }
498
  }
499

UNCOV
500
  return code;
×
501
}
502

503
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
64,733✔
504
  pRequest->type = pQuery->msgType;
64,733✔
505
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
64,733✔
506

507
  SPlanContext cxt = {.queryId = pRequest->requestId,
129,546✔
508
                      .acctId = pRequest->pTscObj->acctId,
64,741✔
509
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
64,741✔
510
                      .pAstRoot = pQuery->pRoot,
64,805✔
511
                      .showRewrite = pQuery->showRewrite,
64,805✔
512
                      .pMsg = pRequest->msgBuf,
64,805✔
513
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
514
                      .pUser = pRequest->pTscObj->user,
64,805✔
515
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
64,805✔
516
                      .sysInfo = pRequest->pTscObj->sysInfo};
64,805✔
517

518
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
64,805✔
519
}
520

521
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols, const SExtSchema* pExtSchema, bool isStmt) {
1,433,464✔
522
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
1,433,464!
523
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
UNCOV
524
    return TSDB_CODE_INVALID_PARA;
×
525
  }
526

527
  pResInfo->numOfCols = numOfCols;
1,433,501✔
528
  if (pResInfo->fields != NULL) {
1,433,501✔
529
    taosMemoryFree(pResInfo->fields);
43!
530
  }
531
  if (pResInfo->userFields != NULL) {
1,433,501✔
532
    taosMemoryFree(pResInfo->userFields);
43!
533
  }
534
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
1,433,501!
535
  if (NULL == pResInfo->fields) return terrno;
1,433,478!
536
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,433,478!
537
  if (NULL == pResInfo->userFields) {
1,433,478!
538
    taosMemoryFree(pResInfo->fields);
×
UNCOV
539
    return terrno;
×
540
  }
541
  if (numOfCols != pResInfo->numOfCols) {
1,433,478!
542
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
UNCOV
543
    return TSDB_CODE_FAILED;
×
544
  }
545

546
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
6,855,870✔
547
    pResInfo->fields[i].type = pSchema[i].type;
5,422,383✔
548

549
    pResInfo->userFields[i].type = pSchema[i].type;
5,422,383✔
550
    // userFields must convert to type bytes, no matter isStmt or not
551
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
5,422,383✔
552
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
5,422,364✔
553
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
5,422,371!
554
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
8,603✔
555
    }
556

557
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
5,422,392✔
558
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
5,422,392✔
559
  }
560
  return TSDB_CODE_SUCCESS;
1,433,487✔
561
}
562

563
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
1,075,742✔
564
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
1,075,742!
565
      precision != TSDB_TIME_PRECISION_NANO) {
UNCOV
566
    return;
×
567
  }
568

569
  pResInfo->precision = precision;
1,075,742✔
570
}
571

572
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
797,476✔
573
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
797,476✔
574
  if (NULL == nodeList) {
797,541!
UNCOV
575
    return terrno;
×
576
  }
577
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
797,560✔
578

579
  int32_t dbNum = taosArrayGetSize(pDbVgList);
797,560✔
580
  for (int32_t i = 0; i < dbNum; ++i) {
1,567,905✔
581
    SArray* pVg = taosArrayGetP(pDbVgList, i);
770,274✔
582
    if (NULL == pVg) {
770,249!
UNCOV
583
      continue;
×
584
    }
585
    int32_t vgNum = taosArrayGetSize(pVg);
770,249✔
586
    if (vgNum <= 0) {
770,261✔
587
      continue;
662✔
588
    }
589

590
    for (int32_t j = 0; j < vgNum; ++j) {
3,734,034✔
591
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
2,964,366✔
592
      if (NULL == pInfo) {
2,964,207!
593
        taosArrayDestroy(nodeList);
×
UNCOV
594
        return TSDB_CODE_OUT_OF_RANGE;
×
595
      }
596
      SQueryNodeLoad load = {0};
2,964,207✔
597
      load.addr.nodeId = pInfo->vgId;
2,964,207✔
598
      load.addr.epSet = pInfo->epSet;
2,964,207✔
599

600
      if (NULL == taosArrayPush(nodeList, &load)) {
2,964,435!
601
        taosArrayDestroy(nodeList);
×
UNCOV
602
        return terrno;
×
603
      }
604
    }
605
  }
606

607
  int32_t vnodeNum = taosArrayGetSize(nodeList);
797,631✔
608
  if (vnodeNum > 0) {
797,617✔
609
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
768,774✔
610
    goto _return;
768,776✔
611
  }
612

613
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
28,843✔
614
  if (mnodeNum <= 0) {
28,847!
615
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
UNCOV
616
    goto _return;
×
617
  }
618

619
  void* pData = taosArrayGet(pMnodeList, 0);
28,847✔
620
  if (NULL == pData) {
28,847!
621
    taosArrayDestroy(nodeList);
×
UNCOV
622
    return TSDB_CODE_OUT_OF_RANGE;
×
623
  }
624
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
28,847✔
625
    taosArrayDestroy(nodeList);
2✔
UNCOV
626
    return terrno;
×
627
  }
628

629
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
28,847✔
630

631
_return:
21,685✔
632

633
  *pNodeList = nodeList;
797,578✔
634

635
  return TSDB_CODE_SUCCESS;
797,578✔
636
}
637

638
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
295,536✔
639
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
295,536✔
640
  if (NULL == nodeList) {
295,536!
UNCOV
641
    return terrno;
×
642
  }
643

644
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
295,536✔
645
  if (qNodeNum > 0) {
295,536✔
646
    void* pData = taosArrayGet(pQnodeList, 0);
295,214✔
647
    if (NULL == pData) {
295,214!
648
      taosArrayDestroy(nodeList);
×
UNCOV
649
      return TSDB_CODE_OUT_OF_RANGE;
×
650
    }
651
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
295,214!
652
      taosArrayDestroy(nodeList);
×
UNCOV
653
      return terrno;
×
654
    }
655
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
295,214✔
656
    goto _return;
295,214✔
657
  }
658

659
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
322✔
660
  if (mnodeNum <= 0) {
322✔
661
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
4!
662
    goto _return;
4✔
663
  }
664

665
  void* pData = taosArrayGet(pMnodeList, 0);
318✔
666
  if (NULL == pData) {
318!
667
    taosArrayDestroy(nodeList);
×
UNCOV
668
    return TSDB_CODE_OUT_OF_RANGE;
×
669
  }
670
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
318!
671
    taosArrayDestroy(nodeList);
×
UNCOV
672
    return terrno;
×
673
  }
674

675
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
318!
676

UNCOV
677
_return:
×
678

679
  *pNodeList = nodeList;
295,536✔
680

681
  return TSDB_CODE_SUCCESS;
295,536✔
682
}
683

684
void freeVgList(void* list) {
44,373✔
685
  SArray* pList = *(SArray**)list;
44,373✔
686
  taosArrayDestroy(pList);
44,373✔
687
}
44,443✔
688

689
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
1,028,244✔
690
  SArray* pDbVgList = NULL;
1,028,244✔
691
  SArray* pQnodeList = NULL;
1,028,244✔
692
  FDelete fp = NULL;
1,028,244✔
693
  int32_t code = 0;
1,028,244✔
694

695
  switch (tsQueryPolicy) {
1,028,244!
696
    case QUERY_POLICY_VNODE:
732,765✔
697
    case QUERY_POLICY_CLIENT: {
698
      if (pResultMeta) {
732,765!
699
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
732,815✔
700
        if (NULL == pDbVgList) {
732,810!
701
          code = terrno;
×
UNCOV
702
          goto _return;
×
703
        }
704
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
732,810✔
705
        for (int32_t i = 0; i < dbNum; ++i) {
1,458,585✔
706
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
725,846✔
707
          if (pRes->code || NULL == pRes->pRes) {
725,831!
UNCOV
708
            continue;
×
709
          }
710

711
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
1,451,734!
712
            code = terrno;
×
UNCOV
713
            goto _return;
×
714
          }
715
        }
716
      } else {
UNCOV
717
        fp = freeVgList;
×
718

719
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
720
        if (dbNum > 0) {
×
721
          SCatalog*     pCtg = NULL;
×
722
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
723
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
724
          if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
725
            goto _return;
×
726
          }
727

728
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
729
          if (NULL == pDbVgList) {
×
730
            code = terrno;
×
UNCOV
731
            goto _return;
×
732
          }
733
          SArray* pVgList = NULL;
×
734
          for (int32_t i = 0; i < dbNum; ++i) {
×
735
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
736
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
737
                                     .requestId = pRequest->requestId,
×
738
                                     .requestObjRefId = pRequest->self,
×
UNCOV
739
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
740

741
            // catalogGetDBVgList will handle dbFName == null.
742
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
743
            if (code) {
×
UNCOV
744
              goto _return;
×
745
            }
746

747
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
748
              code = terrno;
×
UNCOV
749
              goto _return;
×
750
            }
751
          }
752
        }
753
      }
754

755
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
732,739✔
756
      break;
732,823✔
757
    }
758
    case QUERY_POLICY_HYBRID:
295,532✔
759
    case QUERY_POLICY_QNODE: {
760
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
302,162!
761
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
6,630✔
762
        if (pRes->code) {
6,630!
UNCOV
763
          pQnodeList = NULL;
×
764
        } else {
765
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
6,630✔
766
          if (NULL == pQnodeList) {
6,630!
767
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
768
            goto _return;
×
769
          }
770
        }
771
      } else {
772
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
288,904✔
773
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
288,904!
774
        if (pInst->pQnodeList) {
288,906!
775
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
288,906✔
776
          if (NULL == pQnodeList) {
288,906!
777
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
778
            goto _return;
×
779
          }
780
        }
781
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
288,906!
782
      }
783

784
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
295,536✔
785
      break;
295,536✔
786
    }
787
    default:
×
788
      tscError("unknown query policy: %d", tsQueryPolicy);
×
UNCOV
789
      return TSDB_CODE_APP_ERROR;
×
790
  }
791

792
_return:
1,028,359✔
793
  taosArrayDestroyEx(pDbVgList, fp);
1,028,359✔
794
  taosArrayDestroy(pQnodeList);
1,028,368✔
795

796
  return code;
1,028,369✔
797
}
798

799
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
64,689✔
800
  SArray* pDbVgList = NULL;
64,689✔
801
  SArray* pQnodeList = NULL;
64,689✔
802
  int32_t code = 0;
64,689✔
803

804
  switch (tsQueryPolicy) {
64,689!
805
    case QUERY_POLICY_VNODE:
64,705✔
806
    case QUERY_POLICY_CLIENT: {
807
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
64,705✔
808
      if (dbNum > 0) {
64,766✔
809
        SCatalog*     pCtg = NULL;
44,416✔
810
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
44,416✔
811
        code = catalogGetHandle(pInst->clusterId, &pCtg);
44,416✔
812
        if (code != TSDB_CODE_SUCCESS) {
44,414!
UNCOV
813
          goto _return;
×
814
        }
815

816
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
44,414✔
817
        if (NULL == pDbVgList) {
44,440✔
818
          code = terrno;
2✔
UNCOV
819
          goto _return;
×
820
        }
821
        SArray* pVgList = NULL;
44,438✔
822
        for (int32_t i = 0; i < dbNum; ++i) {
88,874✔
823
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
44,382✔
824
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
44,373✔
825
                                   .requestId = pRequest->requestId,
44,373✔
826
                                   .requestObjRefId = pRequest->self,
44,373✔
827
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
44,373✔
828

829
          // catalogGetDBVgList will handle dbFName == null.
830
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
44,451✔
831
          if (code) {
44,438!
UNCOV
832
            goto _return;
×
833
          }
834

835
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
44,436!
836
            code = terrno;
×
UNCOV
837
            goto _return;
×
838
          }
839
        }
840
      }
841

842
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
64,842✔
843
      break;
64,756✔
844
    }
UNCOV
845
    case QUERY_POLICY_HYBRID:
×
846
    case QUERY_POLICY_QNODE: {
UNCOV
847
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
848

849
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
UNCOV
850
      break;
×
851
    }
852
    default:
×
853
      tscError("unknown query policy: %d", tsQueryPolicy);
×
UNCOV
854
      return TSDB_CODE_APP_ERROR;
×
855
  }
856

857
_return:
64,756✔
858

859
  taosArrayDestroyEx(pDbVgList, freeVgList);
64,756✔
860
  taosArrayDestroy(pQnodeList);
64,762✔
861

862
  return code;
64,768✔
863
}
864

865
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
64,723✔
866
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
64,723✔
867

868
  SExecResult      res = {0};
64,723✔
869
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
64,723✔
870
                           .requestId = pRequest->requestId,
64,723✔
871
                           .requestObjRefId = pRequest->self};
64,723✔
872
  SSchedulerReq    req = {
129,456✔
873
         .syncReq = true,
874
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
64,723✔
875
         .pConn = &conn,
876
         .pNodeList = pNodeList,
877
         .pDag = pDag,
878
         .sql = pRequest->sqlstr,
64,723✔
879
         .startTs = pRequest->metric.start,
64,723✔
880
         .execFp = NULL,
881
         .cbParam = NULL,
882
         .chkKillFp = chkRequestKilled,
883
         .chkKillParam = (void*)pRequest->self,
64,723✔
884
         .pExecRes = &res,
885
         .source = pRequest->source,
64,723✔
886
         .pWorkerCb = getTaskPoolWorkerCb(),
64,723✔
887
  };
888

889
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
64,733✔
890

891
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
64,776✔
892
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
64,768✔
893

894
  if (code != TSDB_CODE_SUCCESS) {
64,768!
UNCOV
895
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
896

897
    pRequest->code = code;
×
UNCOV
898
    terrno = code;
×
899
    return pRequest->code;
8✔
900
  }
901

902
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
64,768!
903
      TDMT_VND_CREATE_TABLE == pRequest->type) {
186✔
904
    pRequest->body.resInfo.numOfRows = res.numOfRows;
64,718✔
905
    if (TDMT_VND_SUBMIT == pRequest->type) {
64,718✔
906
      STscObj*            pTscObj = pRequest->pTscObj;
64,608✔
907
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
64,608✔
908
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
64,608✔
909
    }
910

911
    schedulerFreeJob(&pRequest->body.queryJob, 0);
64,728✔
912
  }
913

914
  pRequest->code = res.code;
64,800✔
915
  terrno = res.code;
64,800✔
916
  return pRequest->code;
64,782✔
917
}
918

919
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
9,770,395✔
920
  SArray*      pArray = NULL;
9,770,395✔
921
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
9,770,395✔
922
  if (NULL == pRsp->aCreateTbRsp) {
9,770,395✔
923
    return TSDB_CODE_SUCCESS;
9,697,756✔
924
  }
925

926
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
72,639✔
927
  for (int32_t i = 0; i < tbNum; ++i) {
157,527✔
928
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
81,811✔
929
    if (pTbRsp->pMeta) {
81,810✔
930
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
60,424!
931
    }
932
  }
933

934
  return TSDB_CODE_SUCCESS;
75,716✔
935
}
936

937
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
807,808✔
938
  int32_t code = 0;
807,808✔
939
  SArray* pArray = NULL;
807,808✔
940
  SArray* pTbArray = (SArray*)res;
807,808✔
941
  int32_t tbNum = taosArrayGetSize(pTbArray);
807,808✔
942
  if (tbNum <= 0) {
807,815!
UNCOV
943
    return TSDB_CODE_SUCCESS;
×
944
  }
945

946
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
807,815✔
947
  if (NULL == pArray) {
807,827!
UNCOV
948
    return terrno;
×
949
  }
950

951
  for (int32_t i = 0; i < tbNum; ++i) {
2,231,053✔
952
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
1,423,225✔
953
    if (NULL == tbInfo) {
1,423,223!
954
      code = terrno;
×
UNCOV
955
      goto _return;
×
956
    }
957
    STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
1,423,223✔
958
    if (NULL == taosArrayPush(pArray, &tbSver)) {
1,423,224!
959
      code = terrno;
×
UNCOV
960
      goto _return;
×
961
    }
962
  }
963

964
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
807,828✔
965
                           .requestId = pRequest->requestId,
807,828✔
966
                           .requestObjRefId = pRequest->self,
807,828✔
967
                           .mgmtEps = *epset};
968

969
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
807,828✔
970

971
_return:
807,825✔
972

973
  taosArrayDestroy(pArray);
807,825✔
974
  return code;
807,829✔
975
}
976

977
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
6,867✔
978
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
6,867✔
979
}
980

981
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
220,243✔
982
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
220,243✔
983
}
984

985
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
10,917,173✔
986
  if (NULL == pRequest->body.resInfo.execRes.res) {
10,917,173✔
987
    return pRequest->code;
251,731✔
988
  }
989

990
  SCatalog*     pCatalog = NULL;
10,665,442✔
991
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
10,665,442✔
992

993
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
10,666,606✔
994
  if (code) {
10,683,229!
UNCOV
995
    return code;
×
996
  }
997

998
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
10,683,229✔
999
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
10,702,533✔
1000

1001
  switch (pRes->msgType) {
10,702,533!
1002
    case TDMT_VND_ALTER_TABLE:
1,629✔
1003
    case TDMT_MND_ALTER_STB: {
1004
      code = handleAlterTbExecRes(pRes->res, pCatalog);
1,629✔
1005
      break;
1,629✔
1006
    }
1007
    case TDMT_VND_CREATE_TABLE: {
115,851✔
1008
      SArray* pList = (SArray*)pRes->res;
115,851✔
1009
      int32_t num = taosArrayGetSize(pList);
115,851✔
1010
      for (int32_t i = 0; i < num; ++i) {
267,997✔
1011
        void* res = taosArrayGetP(pList, i);
152,146✔
1012
        // handleCreateTbExecRes will handle res == null
1013
        code = handleCreateTbExecRes(res, pCatalog);
152,142✔
1014
      }
1015
      break;
115,851✔
1016
    }
1017
    case TDMT_MND_CREATE_STB: {
936✔
1018
      code = handleCreateTbExecRes(pRes->res, pCatalog);
936✔
1019
      break;
936✔
1020
    }
1021
    case TDMT_VND_SUBMIT: {
9,778,875✔
1022
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
9,778,875✔
1023

1024
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
9,780,842✔
1025
      break;
9,769,941✔
1026
    }
1027
    case TDMT_SCH_QUERY:
807,819✔
1028
    case TDMT_SCH_MERGE_QUERY: {
1029
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
807,819✔
1030
      break;
807,825✔
1031
    }
UNCOV
1032
    default:
×
UNCOV
1033
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self, pRequest->type,
×
1034
               pRequest->requestId);
UNCOV
1035
      code = TSDB_CODE_APP_ERROR;
×
1036
  }
1037

1038
  return code;
10,696,182✔
1039
}
1040

1041
static bool incompletaFileParsing(SNode* pStmt) {
10,883,808✔
1042
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
10,883,808✔
1043
}
1044

1045
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
437✔
1046
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
437✔
1047

1048
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
437✔
1049
  if (TSDB_CODE_SUCCESS == code) {
437!
1050
    int64_t analyseStart = taosGetTimestampUs();
437✔
1051
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
437✔
1052
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
437✔
1053
  }
1054

1055
  if (TSDB_CODE_SUCCESS == code) {
437!
1056
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
437✔
1057
  }
1058

1059
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
437✔
1060
  handleQueryAnslyseRes(pWrapper, NULL, code);
437✔
1061
}
437✔
1062

1063
void returnToUser(SRequestObj* pRequest) {
82,312✔
1064
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
82,312!
1065
    // return to client
1066
    doRequestCallback(pRequest, pRequest->code);
82,311✔
1067
    return;
82,311✔
1068
  }
1069

1070
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
1✔
1071
  if (pUserReq) {
1!
1072
    pUserReq->code = pRequest->code;
1✔
1073
    // return to client
1074
    doRequestCallback(pUserReq, pUserReq->code);
1✔
1075
    (void)releaseRequest(pRequest->relation.userRefId);
1✔
1076
    return;
1✔
1077
  } else {
UNCOV
1078
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1079
             pRequest->relation.userRefId, pRequest->requestId);
1080
  }
1081
}
1082

1083
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
437✔
1084
  int64_t     lastTs = 0;
437✔
1085
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
437✔
1086
  int32_t     numOfFields = taos_num_fields(pRes);
437✔
1087

1088
  int32_t code = createDataBlock(pBlock);
437✔
1089
  if (code) {
437!
UNCOV
1090
    return code;
×
1091
  }
1092

1093
  for (int32_t i = 0; i < numOfFields; ++i) {
1,748✔
1094
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
1,311✔
1095
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
1,311✔
1096
    if (TSDB_CODE_SUCCESS != code) {
1,311!
1097
      blockDataDestroy(*pBlock);
×
UNCOV
1098
      return code;
×
1099
    }
1100
  }
1101

1102
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
437✔
1103
  if (TSDB_CODE_SUCCESS != code) {
437!
1104
    blockDataDestroy(*pBlock);
×
UNCOV
1105
    return code;
×
1106
  }
1107

1108
  for (int32_t i = 0; i < numOfRows; ++i) {
1,327✔
1109
    TAOS_ROW pRow = taos_fetch_row(pRes);
890✔
1110
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
890!
1111
      tscError("invalid data from vnode");
×
1112
      blockDataDestroy(*pBlock);
×
UNCOV
1113
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1114
    }
1115
    int64_t ts = *(int64_t*)pRow[0];
890✔
1116
    if (lastTs < ts) {
890✔
1117
      lastTs = ts;
509✔
1118
    }
1119

1120
    for (int32_t j = 0; j < numOfFields; ++j) {
3,560✔
1121
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
2,670✔
1122
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
2,670✔
1123
      if (TSDB_CODE_SUCCESS != code) {
2,670!
1124
        blockDataDestroy(*pBlock);
×
UNCOV
1125
        return code;
×
1126
      }
1127
    }
1128

1129
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
890!
1130
  }
1131

1132
  (*pBlock)->info.window.ekey = lastTs;
437✔
1133
  (*pBlock)->info.rows = numOfRows;
437✔
1134

1135
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
437!
1136
  return TSDB_CODE_SUCCESS;
437✔
1137
}
1138

1139
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
438✔
1140
  SRequestObj* pRequest = (SRequestObj*)res;
438✔
1141
  if (pRequest->code) {
438✔
1142
    returnToUser(pRequest);
1✔
1143
    return;
1✔
1144
  }
1145

1146
  SSDataBlock* pBlock = NULL;
437✔
1147
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
437✔
1148
  if (TSDB_CODE_SUCCESS != pRequest->code) {
437!
UNCOV
1149
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1150
             tstrerror(pRequest->code));
1151
    returnToUser(pRequest);
×
UNCOV
1152
    return;
×
1153
  }
1154

1155
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
437✔
1156
  if (pNextReq) {
437!
1157
    continuePostSubQuery(pNextReq, pBlock);
437✔
1158
    (void)releaseRequest(pRequest->relation.nextRefId);
437✔
1159
  } else {
UNCOV
1160
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1161
             pRequest->relation.nextRefId, pRequest->requestId);
1162
  }
1163

1164
  blockDataDestroy(pBlock);
437✔
1165
}
1166

1167
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
438✔
1168
  SRequestObj* pRequest = pWrapper->pRequest;
438✔
1169
  if (TD_RES_QUERY(pRequest)) {
438!
1170
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
438✔
1171
    return;
438✔
1172
  }
1173

1174
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1175
  if (pNextReq) {
×
1176
    continuePostSubQuery(pNextReq, NULL);
×
UNCOV
1177
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1178
  } else {
UNCOV
1179
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1180
             pRequest->relation.nextRefId, pRequest->requestId);
1181
  }
1182
}
1183

1184
// todo refacto the error code  mgmt
1185
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
10,826,903✔
1186
  SSqlCallbackWrapper* pWrapper = param;
10,826,903✔
1187
  SRequestObj*         pRequest = pWrapper->pRequest;
10,826,903✔
1188
  STscObj*             pTscObj = pRequest->pTscObj;
10,826,903✔
1189

1190
  pRequest->code = code;
10,826,903✔
1191
  if (pResult) {
10,826,903✔
1192
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
10,821,339✔
1193
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
10,835,074✔
1194
  }
1195

1196
  int32_t type = pRequest->type;
10,840,638✔
1197
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
10,840,638✔
1198
    if (pResult) {
9,870,179!
1199
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
9,871,707✔
1200

1201
      // record the insert rows
1202
      if (TDMT_VND_SUBMIT == type) {
9,871,707✔
1203
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
9,671,034✔
1204
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
9,671,034✔
1205
      }
1206
    }
1207
    schedulerFreeJob(&pRequest->body.queryJob, 0);
9,913,851✔
1208
  }
1209

1210
  taosMemoryFree(pResult);
10,874,835!
1211
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
10,891,115✔
1212
           pRequest->requestId);
1213

1214
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
10,866,983!
1215
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64, pRequest->self,
318✔
1216
             tstrerror(code), pRequest->retry, pRequest->requestId);
1217
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
318!
UNCOV
1218
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1219
    }
1220
    restartAsyncQuery(pRequest, code);
318✔
1221
    return;
318✔
1222
  }
1223

1224
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
10,866,665!
1225
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
10,866,665!
1226
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
26,343!
UNCOV
1227
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1228
    }
1229
  }
1230

1231
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
10,859,430✔
1232
  int32_t code1 = handleQueryExecRsp(pRequest);
10,859,430✔
1233
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
10,881,314!
UNCOV
1234
    pRequest->code = code1;
×
1235
  }
1236

1237
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
21,765,718!
1238
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
10,884,145✔
UNCOV
1239
    continueInsertFromCsv(pWrapper, pRequest);
×
1240
    return;
22✔
1241
  }
1242

1243
  if (pRequest->relation.nextRefId) {
10,887,272✔
1244
    handlePostSubQuery(pWrapper);
438✔
1245
  } else {
1246
    destorySqlCallbackWrapper(pWrapper);
10,886,834✔
1247
    pRequest->pWrapper = NULL;
10,892,371✔
1248

1249
    // return to client
1250
    doRequestCallback(pRequest, code);
10,892,371✔
1251
  }
1252
}
1253

1254
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
65,872✔
1255
  int32_t code = 0;
65,872✔
1256
  int32_t subplanNum = 0;
65,872✔
1257

1258
  if (pQuery->pRoot) {
65,872✔
1259
    pRequest->stmtType = pQuery->pRoot->type;
64,745✔
1260
  }
1261

1262
  if (pQuery->pRoot && !pRequest->inRetry) {
65,872✔
1263
    STscObj*            pTscObj = pRequest->pTscObj;
64,760✔
1264
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
64,760✔
1265
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
64,760✔
1266
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
64,758✔
1267
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
5!
1268
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
22✔
1269
    }
1270
  }
1271

1272
  pRequest->body.execMode = pQuery->execMode;
65,935✔
1273
  switch (pQuery->execMode) {
65,935!
1274
    case QUERY_EXEC_MODE_LOCAL:
×
1275
      if (!pRequest->validateOnly) {
×
1276
        if (NULL == pQuery->pRoot) {
×
1277
          terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
1278
          code = terrno;
×
1279
        } else {
UNCOV
1280
          code = execLocalCmd(pRequest, pQuery);
×
1281
        }
1282
      }
UNCOV
1283
      break;
×
1284
    case QUERY_EXEC_MODE_RPC:
1,149✔
1285
      if (!pRequest->validateOnly) {
1,149!
1286
        code = execDdlQuery(pRequest, pQuery);
1,149✔
1287
      }
1288
      break;
1,150✔
1289
    case QUERY_EXEC_MODE_SCHEDULE: {
64,786✔
1290
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
64,786✔
1291
      if (NULL == pMnodeList) {
64,767!
1292
        code = terrno;
×
UNCOV
1293
        break;
×
1294
      }
1295
      SQueryPlan* pDag = NULL;
64,767✔
1296
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
64,767✔
1297
      if (TSDB_CODE_SUCCESS == code) {
64,747!
1298
        pRequest->body.subplanNum = pDag->numOfSubplans;
64,758✔
1299
        if (!pRequest->validateOnly) {
64,758!
1300
          SArray* pNodeList = NULL;
64,762✔
1301
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
64,762✔
1302
          if (TSDB_CODE_SUCCESS == code) {
64,767!
1303
            code = scheduleQuery(pRequest, pDag, pNodeList);
64,782✔
1304
          }
1305
          taosArrayDestroy(pNodeList);
64,755✔
1306
        }
1307
      }
1308
      taosArrayDestroy(pMnodeList);
64,781✔
1309
      break;
64,802✔
1310
    }
1311
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1312
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1313
      break;
×
1314
    default:
×
UNCOV
1315
      break;
×
1316
  }
1317

1318
  if (!keepQuery) {
65,952!
UNCOV
1319
    qDestroyQuery(pQuery);
×
1320
  }
1321

1322
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
65,952!
1323
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
217!
1324
    if (TSDB_CODE_SUCCESS != ret) {
217!
UNCOV
1325
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret, pRequest->requestId);
×
1326
    }
1327
  }
1328

1329
  if (TSDB_CODE_SUCCESS == code) {
65,952✔
1330
    code = handleQueryExecRsp(pRequest);
65,945✔
1331
  }
1332

1333
  if (TSDB_CODE_SUCCESS != code) {
65,946✔
1334
    pRequest->code = code;
138✔
1335
  }
1336

1337
  if (res) {
65,946!
1338
    *res = pRequest->body.resInfo.execRes.res;
×
UNCOV
1339
    pRequest->body.resInfo.execRes.res = NULL;
×
1340
  }
1341
}
65,946✔
1342

1343
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
10,868,362✔
1344
                                 SSqlCallbackWrapper* pWrapper) {
1345
  int32_t code = TSDB_CODE_SUCCESS;
10,868,362✔
1346
  pRequest->type = pQuery->msgType;
10,868,362✔
1347
  SArray*     pMnodeList = NULL;
10,868,362✔
1348
  SQueryPlan* pDag = NULL;
10,868,362✔
1349
  int64_t     st = taosGetTimestampUs();
10,859,325✔
1350

1351
  if (!pRequest->parseOnly) {
10,859,325!
1352
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
10,863,412✔
1353
    if (NULL == pMnodeList) {
10,861,247!
UNCOV
1354
      code = terrno;
×
1355
    }
1356
    SPlanContext cxt = {.queryId = pRequest->requestId,
21,732,207✔
1357
                        .acctId = pRequest->pTscObj->acctId,
10,861,247✔
1358
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
10,861,247✔
1359
                        .pAstRoot = pQuery->pRoot,
10,870,960✔
1360
                        .showRewrite = pQuery->showRewrite,
10,870,960✔
1361
                        .isView = pWrapper->pParseCtx->isView,
10,870,960✔
1362
                        .isAudit = pWrapper->pParseCtx->isAudit,
10,870,960✔
1363
                        .pMsg = pRequest->msgBuf,
10,870,960✔
1364
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1365
                        .pUser = pRequest->pTscObj->user,
10,870,960✔
1366
                        .sysInfo = pRequest->pTscObj->sysInfo,
10,870,960✔
1367
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
10,870,960✔
1368
                        .allocatorId = pRequest->allocatorRefId};
10,870,960✔
1369
    if (TSDB_CODE_SUCCESS == code) {
10,870,960!
1370
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
10,874,514✔
1371
    }
1372
    if (code) {
10,844,639✔
1373
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
674!
1374
               pRequest->requestId);
1375
    } else {
1376
      pRequest->body.subplanNum = pDag->numOfSubplans;
10,843,965✔
1377
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
10,843,965✔
1378
    }
1379
  }
1380

1381
  pRequest->metric.execStart = taosGetTimestampUs();
10,826,298✔
1382
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
10,826,298✔
1383

1384
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
21,676,196!
1385
    SArray* pNodeList = NULL;
10,845,986✔
1386
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
10,845,986✔
1387
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
1,028,345✔
1388
    }
1389

1390
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
10,846,011✔
1391
                             .requestId = pRequest->requestId,
10,821,643✔
1392
                             .requestObjRefId = pRequest->self};
10,821,643✔
1393
    SSchedulerReq    req = {
21,637,332✔
1394
           .syncReq = false,
1395
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
10,821,643✔
1396
           .pConn = &conn,
1397
           .pNodeList = pNodeList,
1398
           .pDag = pDag,
1399
           .allocatorRefId = pRequest->allocatorRefId,
10,821,643✔
1400
           .sql = pRequest->sqlstr,
10,821,643✔
1401
           .startTs = pRequest->metric.start,
10,821,643✔
1402
           .execFp = schedulerExecCb,
1403
           .cbParam = pWrapper,
1404
           .chkKillFp = chkRequestKilled,
1405
           .chkKillParam = (void*)pRequest->self,
10,821,643✔
1406
           .pExecRes = NULL,
1407
           .source = pRequest->source,
10,821,643✔
1408
           .pWorkerCb = getTaskPoolWorkerCb(),
10,821,643✔
1409
    };
1410
    if (TSDB_CODE_SUCCESS == code) {
10,815,689✔
1411
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
10,814,975✔
1412
    }
1413

1414
    taosArrayDestroy(pNodeList);
10,858,766✔
1415
  } else {
UNCOV
1416
    qDestroyQueryPlan(pDag);
×
1417
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
1,139✔
1418
             pRequest->requestId);
1419
    destorySqlCallbackWrapper(pWrapper);
1,139✔
1420
    pRequest->pWrapper = NULL;
1,139✔
1421
    if (TSDB_CODE_SUCCESS != code) {
1,139✔
1422
      pRequest->code = terrno;
674✔
1423
    }
1424

1425
    doRequestCallback(pRequest, code);
1,139✔
1426
  }
1427

1428
  // todo not to be released here
1429
  taosArrayDestroy(pMnodeList);
10,851,037✔
1430

1431
  return code;
10,851,169✔
1432
}
1433

1434
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
10,989,873✔
1435
  int32_t code = 0;
10,989,873✔
1436

1437
  if (pRequest->parseOnly) {
10,989,873✔
1438
    doRequestCallback(pRequest, 0);
642✔
1439
    return;
642✔
1440
  }
1441

1442
  pRequest->body.execMode = pQuery->execMode;
10,989,231✔
1443
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
10,989,231✔
1444
    destorySqlCallbackWrapper(pWrapper);
180,672✔
1445
    pRequest->pWrapper = NULL;
180,672✔
1446
  }
1447

1448
  if (pQuery->pRoot && !pRequest->inRetry) {
10,989,231!
1449
    STscObj*            pTscObj = pRequest->pTscObj;
11,002,227✔
1450
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
11,002,227✔
1451
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
11,002,227✔
1452
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
9,825,296✔
1453
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
9,681,173✔
1454
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
1,321,054✔
1455
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
907,971✔
1456
    }
1457
  }
1458

1459
  switch (pQuery->execMode) {
11,068,979!
1460
    case QUERY_EXEC_MODE_LOCAL:
134,101✔
1461
      asyncExecLocalCmd(pRequest, pQuery);
134,101✔
1462
      break;
134,101✔
1463
    case QUERY_EXEC_MODE_RPC:
43,842✔
1464
      code = asyncExecDdlQuery(pRequest, pQuery);
43,842✔
1465
      break;
43,853✔
1466
    case QUERY_EXEC_MODE_SCHEDULE: {
10,888,297✔
1467
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
10,888,297✔
1468
      break;
10,841,637✔
1469
    }
1470
    case QUERY_EXEC_MODE_EMPTY_RESULT:
2,739✔
1471
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
2,739✔
1472
      doRequestCallback(pRequest, 0);
2,739✔
1473
      break;
2,739✔
1474
    default:
×
1475
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1476
      doRequestCallback(pRequest, -1);
×
UNCOV
1477
      break;
×
1478
  }
1479
}
1480

1481
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
34✔
1482
  SCatalog* pCatalog = NULL;
34✔
1483
  int32_t   code = 0;
34✔
1484
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
34✔
1485
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
34✔
1486

1487
  if (dbNum <= 0 && tblNum <= 0) {
34!
1488
    return TSDB_CODE_APP_ERROR;
34✔
1489
  }
1490

1491
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
1492
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1493
    return code;
×
1494
  }
1495

1496
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1497
                           .requestId = pRequest->requestId,
×
1498
                           .requestObjRefId = pRequest->self,
×
UNCOV
1499
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1500

1501
  for (int32_t i = 0; i < dbNum; ++i) {
×
UNCOV
1502
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1503

1504
    // catalogRefreshDBVgInfo will handle dbFName == null.
1505
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
1506
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1507
      return code;
×
1508
    }
1509
  }
1510

1511
  for (int32_t i = 0; i < tblNum; ++i) {
×
UNCOV
1512
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1513

1514
    // catalogRefreshTableMeta will handle tableName == null.
1515
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
1516
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1517
      return code;
×
1518
    }
1519
  }
1520

UNCOV
1521
  return code;
×
1522
}
1523

1524
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
30,023✔
1525
  SCatalog* pCatalog = NULL;
30,023✔
1526
  int32_t   tbNum = taosArrayGetSize(tbList);
30,023✔
1527
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
30,023✔
1528
  if (code != TSDB_CODE_SUCCESS) {
30,023!
UNCOV
1529
    return code;
×
1530
  }
1531

1532
  if (isView) {
30,023✔
1533
    for (int32_t i = 0; i < tbNum; ++i) {
1,760✔
1534
      SName* pViewName = taosArrayGet(tbList, i);
880✔
1535
      char   dbFName[TSDB_DB_FNAME_LEN];
1536
      if (NULL == pViewName) {
880!
UNCOV
1537
        continue;
×
1538
      }
1539
      (void)tNameGetFullDbName(pViewName, dbFName);
880✔
1540
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
880!
1541
    }
1542
  } else {
1543
    for (int32_t i = 0; i < tbNum; ++i) {
48,089✔
1544
      SName* pTbName = taosArrayGet(tbList, i);
18,946✔
1545
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
18,946!
1546
    }
1547
  }
1548

1549
  return TSDB_CODE_SUCCESS;
30,023✔
1550
}
1551

1552
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
32,875✔
1553
  pEpSet->version = 0;
32,875✔
1554

1555
  // init mnode ip set
1556
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
32,875✔
1557
  mgmtEpSet->numOfEps = 0;
32,875✔
1558
  mgmtEpSet->inUse = 0;
32,875✔
1559

1560
  if (firstEp && firstEp[0] != 0) {
32,875!
1561
    if (strlen(firstEp) >= TSDB_EP_LEN) {
33,113!
1562
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1563
      return -1;
×
1564
    }
1565

1566
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
33,113✔
1567
    if (code != TSDB_CODE_SUCCESS) {
33,077!
1568
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1569
      return terrno;
×
1570
    }
1571
    uint32_t addr = 0;
33,077✔
1572
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
33,077✔
1573
    if (code) {
33,190✔
1574
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
7✔
1575
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1576
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
4✔
1577
    } else {
1578
      mgmtEpSet->numOfEps++;
33,183✔
1579
    }
1580
  }
1581

1582
  if (secondEp && secondEp[0] != 0) {
32,949!
1583
    if (strlen(secondEp) >= TSDB_EP_LEN) {
21,795!
1584
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1585
      return terrno;
×
1586
    }
1587

1588
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
21,795✔
1589
    if (code != TSDB_CODE_SUCCESS) {
21,795!
UNCOV
1590
      return code;
×
1591
    }
1592
    uint32_t addr = 0;
21,795✔
1593
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
21,795✔
1594
    if (code) {
21,795!
UNCOV
1595
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1596
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
UNCOV
1597
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1598
    } else {
1599
      mgmtEpSet->numOfEps++;
21,795✔
1600
    }
1601
  }
1602

1603
  if (mgmtEpSet->numOfEps == 0) {
32,949✔
1604
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1605
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1606
  }
1607

1608
  return 0;
32,945✔
1609
}
1610

1611
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
33,188✔
1612
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1613
  *pTscObj = NULL;
33,188✔
1614
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
33,188✔
1615
  if (TSDB_CODE_SUCCESS != code) {
33,188!
UNCOV
1616
    return code;
×
1617
  }
1618

1619
  SRequestObj* pRequest = NULL;
33,188✔
1620
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
33,188✔
1621
  if (TSDB_CODE_SUCCESS != code) {
33,184!
1622
    destroyTscObj(*pTscObj);
×
UNCOV
1623
    return code;
×
1624
  }
1625

1626
  pRequest->sqlstr = taosStrdup("taos_connect");
33,184!
1627
  if (pRequest->sqlstr) {
33,184!
1628
    pRequest->sqlLen = strlen(pRequest->sqlstr);
33,184✔
1629
  } else {
UNCOV
1630
    return terrno;
×
1631
  }
1632

1633
  SMsgSendInfo* body = NULL;
33,184✔
1634
  code = buildConnectMsg(pRequest, &body);
33,184✔
1635
  if (TSDB_CODE_SUCCESS != code) {
33,159!
1636
    destroyTscObj(*pTscObj);
×
UNCOV
1637
    return code;
×
1638
  }
1639

1640
  // int64_t transporterId = 0;
1641
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
33,159✔
1642
  if (TSDB_CODE_SUCCESS != code) {
33,184!
1643
    destroyTscObj(*pTscObj);
×
1644
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
UNCOV
1645
    return code;
×
1646
  }
1647
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
33,184!
1648
    destroyTscObj(*pTscObj);
×
1649
    tscError("failed to wait sem, code:%s", terrstr());
×
UNCOV
1650
    return terrno;
×
1651
  }
1652
  if (pRequest->code != TSDB_CODE_SUCCESS) {
33,185✔
1653
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
63!
1654
    tscError("failed to connect to server, reason: %s", errorMsg);
63!
1655

1656
    terrno = pRequest->code;
63✔
1657
    destroyRequest(pRequest);
63✔
1658
    taos_close_internal(*pTscObj);
63✔
1659
    *pTscObj = NULL;
63✔
1660
    return terrno;
63✔
1661
  } else {
1662
    tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
33,122✔
1663
             (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1664
    destroyRequest(pRequest);
33,127✔
1665
  }
1666
  return code;
33,112✔
1667
}
1668

1669
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
33,183✔
1670
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
33,183!
1671
  if (*pMsgSendInfo == NULL) {
33,188!
UNCOV
1672
    return terrno;
×
1673
  }
1674

1675
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
33,188✔
1676

1677
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
33,188✔
1678
  (*pMsgSendInfo)->requestId = pRequest->requestId;
33,188✔
1679
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
33,188✔
1680
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
33,184!
1681
  if (NULL == (*pMsgSendInfo)->param) {
33,181!
1682
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1683
    return terrno;
×
1684
  }
1685

1686
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
33,181✔
1687

1688
  SConnectReq connectReq = {0};
33,181✔
1689
  STscObj*    pObj = pRequest->pTscObj;
33,181✔
1690

1691
  char* db = getDbOfConnection(pObj);
33,181✔
1692
  if (db != NULL) {
33,186✔
1693
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,547✔
1694
  } else if (terrno) {
31,639!
1695
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1696
    return terrno;
×
1697
  }
1698
  taosMemoryFreeClear(db);
33,186!
1699

1700
  connectReq.connType = pObj->connType;
33,186✔
1701
  connectReq.pid = appInfo.pid;
33,186✔
1702
  connectReq.startTime = appInfo.startTime;
33,186✔
1703

1704
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
33,186✔
1705
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
33,186✔
1706
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
33,186✔
1707
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
33,186✔
1708

1709
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
33,186✔
1710
  void*   pReq = taosMemoryMalloc(contLen);
33,152!
1711
  if (NULL == pReq) {
33,177!
1712
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1713
    return terrno;
×
1714
  }
1715

1716
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
33,177✔
1717
    taosMemoryFree(*pMsgSendInfo);
7!
1718
    taosMemoryFree(pReq);
×
UNCOV
1719
    return terrno;
×
1720
  }
1721

1722
  (*pMsgSendInfo)->msgInfo.len = contLen;
33,159✔
1723
  (*pMsgSendInfo)->msgInfo.pData = pReq;
33,159✔
1724
  return TSDB_CODE_SUCCESS;
33,159✔
1725
}
1726

1727
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
19,491,637✔
1728
  if (NULL == pEpSet) {
19,491,637✔
1729
    return;
18,028,740✔
1730
  }
1731

1732
  switch (pSendInfo->target.type) {
1,462,897✔
1733
    case TARGET_TYPE_MNODE:
41✔
1734
      if (NULL == pTscObj) {
41!
UNCOV
1735
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1736
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
UNCOV
1737
        return;
×
1738
      }
1739

1740
      SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
41✔
1741
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
41✔
1742
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
41✔
1743
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
41!
1744
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1745
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
41✔
1746
      break;
41✔
1747
    case TARGET_TYPE_VNODE: {
833,983✔
1748
      if (NULL == pTscObj) {
833,983✔
1749
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
6!
1750
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1751
        return;
6✔
1752
      }
1753

1754
      SCatalog* pCatalog = NULL;
833,977✔
1755
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
833,977✔
1756
      if (code != TSDB_CODE_SUCCESS) {
833,982!
UNCOV
1757
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1758
                 tstrerror(code));
UNCOV
1759
        return;
×
1760
      }
1761

1762
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
833,982✔
1763
      if (code != TSDB_CODE_SUCCESS) {
833,979!
UNCOV
1764
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1765
                 tstrerror(code));
UNCOV
1766
        return;
×
1767
      }
1768
      taosMemoryFreeClear(pSendInfo->target.dbFName);
833,981!
1769
      break;
833,980✔
1770
    }
1771
    default:
628,873✔
1772
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
628,873!
1773
      break;
629,738✔
1774
  }
1775
}
1776

1777
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
19,502,210✔
1778
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
19,502,210✔
1779
  if (pMsg->info.ahandle == NULL) {
19,502,210✔
1780
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
445!
1781
    rpcFreeCont(pMsg->pCont);
445✔
1782
    taosMemoryFree(pEpSet);
445!
1783
    return TSDB_CODE_TSC_INTERNAL_ERROR;
444✔
1784
  }
1785

1786
  STscObj* pTscObj = NULL;
19,501,765✔
1787

1788
  STraceId* trace = &pMsg->info.traceId;
19,501,765✔
1789
  char      tbuf[40] = {0};
19,501,765✔
1790
  TRACE_TO_STR(trace, tbuf);
19,501,765!
1791

1792
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
19,502,233!
1793
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1794

1795
  if (pSendInfo->requestObjRefId != 0) {
19,502,749✔
1796
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
13,326,956✔
1797
    if (pRequest) {
13,320,485✔
1798
      if (pRequest->self != pSendInfo->requestObjRefId) {
13,318,112!
UNCOV
1799
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64,
×
1800
                 pRequest->self, pSendInfo->requestObjRefId);
1801

1802
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
UNCOV
1803
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1804
        }
1805
        rpcFreeCont(pMsg->pCont);
×
1806
        taosMemoryFree(pEpSet);
×
1807
        destroySendMsgInfo(pSendInfo);
×
UNCOV
1808
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1809
      }
1810
      pTscObj = pRequest->pTscObj;
13,318,112✔
1811
    }
1812
  }
1813

1814
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
19,496,278✔
1815

1816
  SDataBuf buf = {.msgType = pMsg->msgType,
19,491,216✔
1817
                  .len = pMsg->contLen,
19,491,216✔
1818
                  .pData = NULL,
1819
                  .handle = pMsg->info.handle,
19,491,216✔
1820
                  .handleRefId = pMsg->info.refId,
19,491,216✔
1821
                  .pEpSet = pEpSet};
1822

1823
  if (pMsg->contLen > 0) {
19,491,216✔
1824
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
18,197,140!
1825
    if (buf.pData == NULL) {
18,198,614!
UNCOV
1826
      pMsg->code = terrno;
×
1827
    } else {
1828
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
18,198,614✔
1829
    }
1830
  }
1831

1832
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
19,492,690✔
1833

1834
  if (pTscObj) {
19,485,328✔
1835
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
13,307,920✔
1836
    if (TSDB_CODE_SUCCESS != code) {
13,315,561!
1837
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1838
      terrno = code;
×
UNCOV
1839
      pMsg->code = code;
×
1840
    }
1841
  }
1842

1843
  rpcFreeCont(pMsg->pCont);
19,492,969✔
1844
  destroySendMsgInfo(pSendInfo);
19,497,510✔
1845
  return TSDB_CODE_SUCCESS;
19,499,777✔
1846
}
1847

1848
int32_t doProcessMsgFromServer(void* param) {
19,504,833✔
1849
  AsyncArg* arg = (AsyncArg*)param;
19,504,833✔
1850
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
19,504,833✔
1851
  taosMemoryFree(arg);
19,497,836!
1852
  return code;
19,502,718✔
1853
}
1854

1855
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
19,477,916✔
1856
  int32_t code = 0;
19,477,916✔
1857
  SEpSet* tEpSet = NULL;
19,477,916✔
1858

1859
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
19,477,916✔
1860

1861
  if (pEpSet != NULL) {
19,476,769✔
1862
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
1,463,764!
1863
    if (NULL == tEpSet) {
1,463,754!
1864
      code = terrno;
×
1865
      pMsg->code = terrno;
×
UNCOV
1866
      goto _exit;
×
1867
    }
1868
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
1,463,754✔
1869
  }
1870

1871
  // pMsg is response msg
1872
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
19,476,759✔
1873
    // restore origin code
1874
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
33,170!
UNCOV
1875
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1876
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
33,170!
UNCOV
1877
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1878
    }
1879
  } else {
1880
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1881
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
19,443,589!
UNCOV
1882
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1883
    }
1884
  }
1885

1886
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
19,476,759!
1887
  if (NULL == arg) {
19,477,784!
1888
    code = terrno;
×
1889
    pMsg->code = code;
×
UNCOV
1890
    goto _exit;
×
1891
  }
1892

1893
  arg->msg = *pMsg;
19,477,784✔
1894
  arg->pEpset = tEpSet;
19,477,784✔
1895

1896
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
19,477,784!
1897
    pMsg->code = code;
×
1898
    taosMemoryFree(arg);
×
UNCOV
1899
    goto _exit;
×
1900
  }
1901
  return;
19,498,964✔
1902
  
1903
_exit:
×
1904
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
1905
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
1906
  if (code != 0) {
×
UNCOV
1907
    tscError("failed to sched msg to tsc, tsc ready quit");
×
1908
  }
1909
}
1910

1911
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
4✔
1912
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
4!
1913
  if (user == NULL) {
4!
UNCOV
1914
    user = TSDB_DEFAULT_USER;
×
1915
  }
1916

1917
  if (auth == NULL) {
4!
1918
    tscError("No auth info is given, failed to connect to server");
×
UNCOV
1919
    return NULL;
×
1920
  }
1921

1922
  STscObj* pObj = NULL;
4✔
1923
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
4✔
1924
  if (TSDB_CODE_SUCCESS == code) {
4✔
1925
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1!
1926
    if (NULL == rid) {
1!
UNCOV
1927
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
1928
    }
1929
    *rid = pObj->id;
1✔
1930
    return (TAOS*)rid;
1✔
1931
  }
1932

1933
  return NULL;
3✔
1934
}
1935

1936
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
1937
//                      const char* db, int dbLen, uint16_t port) {
1938
//   char ipStr[TSDB_EP_LEN] = {0};
1939
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
1940
//   char userStr[TSDB_USER_LEN] = {0};
1941
//   char passStr[TSDB_PASSWORD_LEN] = {0};
1942
//
1943
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
1944
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
1945
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
1946
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1947
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
1948
// }
1949

1950
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
35,572,285✔
1951
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
175,065,442✔
1952
    SResultColumn* pCol = &pResultInfo->pCol[i];
139,523,015✔
1953

1954
    int32_t type = pResultInfo->fields[i].type;
139,523,015✔
1955
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
139,523,015✔
1956

1957
    if (IS_VAR_DATA_TYPE(type)) {
139,493,157✔
1958
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
26,999,202!
1959
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
25,690,044✔
1960

1961
        pResultInfo->length[i] = varDataLen(pStart);
25,690,044✔
1962
        pResultInfo->row[i] = varDataVal(pStart);
25,690,044✔
1963
      } else {
1964
        pResultInfo->row[i] = NULL;
1,309,158✔
1965
        pResultInfo->length[i] = 0;
1,309,158✔
1966
      }
1967
    } else {
1968
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
112,493,955✔
1969
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
107,005,604✔
1970
        pResultInfo->length[i] = schemaBytes;
107,005,604✔
1971
      } else {
1972
        pResultInfo->row[i] = NULL;
5,488,351✔
1973
        pResultInfo->length[i] = 0;
5,488,351✔
1974
      }
1975
    }
1976
  }
1977
}
35,542,427✔
1978

1979
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
1980
  if (pRequest == NULL) {
×
UNCOV
1981
    return NULL;
×
1982
  }
1983

1984
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
UNCOV
1985
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
1986
    // All data has returned to App already, no need to try again
1987
    if (pResultInfo->completed) {
×
1988
      pResultInfo->numOfRows = 0;
×
UNCOV
1989
      return NULL;
×
1990
    }
1991

1992
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
UNCOV
1993
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
1994

1995
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
1996
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
1997
      pResultInfo->numOfRows = 0;
×
UNCOV
1998
      return NULL;
×
1999
    }
2000

2001
    pRequest->code =
×
2002
        setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4, pRequest->isStmtBind);
×
2003
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2004
      pResultInfo->numOfRows = 0;
×
UNCOV
2005
      return NULL;
×
2006
    }
2007

UNCOV
2008
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
×
2009
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2010

2011
    STscObj*            pTscObj = pRequest->pTscObj;
×
2012
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
UNCOV
2013
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2014

2015
    if (pResultInfo->numOfRows == 0) {
×
UNCOV
2016
      return NULL;
×
2017
    }
2018
  }
2019

2020
  if (setupOneRowPtr) {
×
2021
    doSetOneRowPtr(pResultInfo);
×
UNCOV
2022
    pResultInfo->current += 1;
×
2023
  }
2024

UNCOV
2025
  return pResultInfo->row;
×
2026
}
2027

2028
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
997,377✔
2029
  tsem_t* sem = param;
997,377✔
2030
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
997,377!
UNCOV
2031
    tscError("failed to post sem, code:%s", terrstr());
×
2032
  }
2033
}
997,385✔
2034

2035
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
20,563,164✔
2036
  if (pRequest == NULL) {
20,563,164!
UNCOV
2037
    return NULL;
×
2038
  }
2039

2040
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
20,563,164✔
2041
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
20,563,164✔
2042
    // All data has returned to App already, no need to try again
2043
    if (pResultInfo->completed) {
1,715,663✔
2044
      pResultInfo->numOfRows = 0;
718,280✔
2045
      return NULL;
718,280✔
2046
    }
2047

2048
    // convert ucs4 to native multi-bytes string
2049
    pResultInfo->convertUcs4 = convertUcs4;
997,383✔
2050
    tsem_t sem;
2051
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
997,383!
UNCOV
2052
      tscError("failed to init sem, code:%s", terrstr());
×
2053
    }
2054
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
997,383✔
2055
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
997,388!
UNCOV
2056
      tscError("failed to wait sem, code:%s", terrstr());
×
2057
    }
2058
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
997,381!
UNCOV
2059
      tscError("failed to destroy sem, code:%s", terrstr());
×
2060
    }
2061
    pRequest->inCallback = false;
997,370✔
2062
  }
2063

2064
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
19,844,871!
2065
    return NULL;
64,963✔
2066
  } else {
2067
    if (setupOneRowPtr) {
19,779,908✔
2068
      doSetOneRowPtr(pResultInfo);
18,920,932✔
2069
      pResultInfo->current += 1;
18,919,945✔
2070
    }
2071

2072
    return pResultInfo->row;
19,778,921✔
2073
  }
2074
}
2075

2076
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
1,503,776✔
2077
  if (pResInfo->row == NULL) {
1,503,776✔
2078
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,308,607!
2079
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
1,308,654!
2080
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
1,308,657!
2081
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,308,669!
2082

2083
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
1,308,663!
2084
      taosMemoryFree(pResInfo->row);
10!
2085
      taosMemoryFree(pResInfo->pCol);
×
2086
      taosMemoryFree(pResInfo->length);
×
2087
      taosMemoryFree(pResInfo->convertBuf);
×
UNCOV
2088
      return terrno;
×
2089
    }
2090
  }
2091

2092
  return TSDB_CODE_SUCCESS;
1,503,822✔
2093
}
2094

2095
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
1,503,504✔
2096
  int32_t idx = -1;
1,503,504✔
2097
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
1,503,504✔
2098
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
1,503,552!
2099

2100
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
7,031,643✔
2101
    int32_t type = pResultInfo->fields[i].type;
5,528,105✔
2102
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
5,528,105✔
2103

2104
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
5,528,087✔
2105
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
244,006!
2106
      if (p == NULL) {
244,006!
2107
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
UNCOV
2108
        return terrno;
×
2109
      }
2110

2111
      pResultInfo->convertBuf[i] = p;
244,006✔
2112

2113
      SResultColumn* pCol = &pResultInfo->pCol[i];
244,006✔
2114
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
39,361,792✔
2115
        if (pCol->offset[j] != -1) {
39,117,782✔
2116
          char* pStart = pCol->offset[j] + pCol->pData;
33,633,071✔
2117

2118
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
33,633,071✔
2119
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
33,633,080!
2120
            tscError(
5✔
2121
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2122
                "colLength[i]):%p",
2123
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2124
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
5✔
2125
            return TSDB_CODE_TSC_INTERNAL_ERROR;
1✔
2126
          }
2127

2128
          varDataSetLen(p, len);
33,633,075✔
2129
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
33,633,075✔
2130
          p += (len + VARSTR_HEADER_SIZE);
33,633,075✔
2131
        }
2132
      }
2133

2134
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
244,010✔
2135
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
244,010✔
2136
    }
2137
  }
2138
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
1,503,538✔
2139
  return TSDB_CODE_SUCCESS;
1,503,557✔
2140
}
2141

2142
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
1,503,545✔
2143
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
7,031,674✔
2144
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
5,528,129✔
2145
    TAOS_FIELD* pField = pResultInfo->userFields + i;
5,528,129✔
2146
    int32_t type = pFieldE->type;
5,528,129✔
2147
    int32_t bufLen = 0;
5,528,129✔
2148
    char* p = NULL;
5,528,129✔
2149
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
5,528,129!
2150
      continue;
5,520,057✔
2151
    } else {
2152
      bufLen = 64;
8,072✔
2153
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
8,072!
2154
      pFieldE->bytes = bufLen;
8,072✔
2155
      pField->bytes = bufLen;
8,072✔
2156
    }
2157
    if (!p) return terrno;
8,072!
2158
    pResultInfo->convertBuf[i] = p;
8,072✔
2159

2160
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
6,499,299✔
2161
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
6,491,227✔
2162
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
6,491,227✔
2163
      p += bufLen;
6,491,227✔
2164
      if (TSDB_CODE_SUCCESS != code) {
6,491,227!
UNCOV
2165
        return code;
×
2166
      }
2167
    }
2168
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
8,072✔
2169
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
8,072✔
2170
  }
2171
  return 0;
1,503,545✔
2172
}
2173

2174
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1,270✔
2175
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
1,270✔
2176
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
2177
}
2178

2179
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
635✔
2180
  char*   p = (char*)pResultInfo->pData;
635✔
2181
  int32_t blockVersion = *(int32_t*)p;
635✔
2182

2183
  int32_t numOfRows = pResultInfo->numOfRows;
635✔
2184
  int32_t numOfCols = pResultInfo->numOfCols;
635✔
2185

2186
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2187
  // length |
2188
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
635✔
2189
  if (numOfCols != cols) {
635!
2190
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2191
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2192
  }
2193

2194
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
635✔
2195
  int32_t* colLength = (int32_t*)(p + len);
635✔
2196
  len += sizeof(int32_t) * numOfCols;
635✔
2197

2198
  char* pStart = p + len;
635✔
2199
  for (int32_t i = 0; i < numOfCols; ++i) {
2,895✔
2200
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,260!
2201

2202
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,260✔
2203
      int32_t* offset = (int32_t*)pStart;
712✔
2204
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
712✔
2205
      len += lenTmp;
712✔
2206
      pStart += lenTmp;
712✔
2207

2208
      int32_t estimateColLen = 0;
712✔
2209
      for (int32_t j = 0; j < numOfRows; ++j) {
3,613✔
2210
        if (offset[j] == -1) {
2,901✔
2211
          continue;
223✔
2212
        }
2213
        char* data = offset[j] + pStart;
2,678✔
2214

2215
        int32_t jsonInnerType = *data;
2,678✔
2216
        char*   jsonInnerData = data + CHAR_BYTES;
2,678✔
2217
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
2,678✔
2218
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
72✔
2219
        } else if (tTagIsJson(data)) {
2,606✔
2220
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
844✔
2221
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,762✔
2222
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
1,466✔
2223
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2224
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
216✔
2225
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2226
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
80✔
2227
        } else {
2228
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
UNCOV
2229
          return -1;
×
2230
        }
2231
      }
2232
      len += TMAX(colLen, estimateColLen);
712✔
2233
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,548!
2234
      int32_t lenTmp = numOfRows * sizeof(int32_t);
396✔
2235
      len += (lenTmp + colLen);
396✔
2236
      pStart += lenTmp;
396✔
2237
    } else {
2238
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
1,152✔
2239
      len += (lenTmp + colLen);
1,152✔
2240
      pStart += lenTmp;
1,152✔
2241
    }
2242
    pStart += colLen;
2,260✔
2243
  }
2244

2245
  // Ensure the complete structure of the block, including the blankfill field,
2246
  // even though it is not used on the client side.
2247
  len += sizeof(bool);
635✔
2248
  return len;
635✔
2249
}
2250

2251
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
1,503,814✔
2252
  int32_t numOfRows = pResultInfo->numOfRows;
1,503,814✔
2253
  int32_t numOfCols = pResultInfo->numOfCols;
1,503,814✔
2254
  bool    needConvert = false;
1,503,814✔
2255
  for (int32_t i = 0; i < numOfCols; ++i) {
7,032,628✔
2256
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
5,529,449✔
2257
      needConvert = true;
635✔
2258
      break;
635✔
2259
    }
2260
  }
2261

2262
  if (!needConvert) {
1,503,814✔
2263
    return TSDB_CODE_SUCCESS;
1,503,191✔
2264
  }
2265

2266
  tscDebug("start to convert form json format string");
623✔
2267

2268
  char*   p = (char*)pResultInfo->pData;
623✔
2269
  int32_t blockVersion = *(int32_t*)p;
623✔
2270
  int32_t dataLen = estimateJsonLen(pResultInfo);
623✔
2271
  if (dataLen <= 0) {
635!
2272
    tscError("doConvertJson error: estimateJsonLen failed");
×
UNCOV
2273
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2274
  }
2275

2276
  taosMemoryFreeClear(pResultInfo->convertJson);
635!
2277
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
635!
2278
  if (pResultInfo->convertJson == NULL) return terrno;
635!
2279
  char* p1 = pResultInfo->convertJson;
635✔
2280

2281
  int32_t totalLen = 0;
635✔
2282
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
635✔
2283
  if (numOfCols != cols) {
635!
2284
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2285
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2286
  }
2287

2288
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
635✔
2289
  (void)memcpy(p1, p, len);
635✔
2290

2291
  p += len;
635✔
2292
  p1 += len;
635✔
2293
  totalLen += len;
635✔
2294

2295
  len = sizeof(int32_t) * numOfCols;
635✔
2296
  int32_t* colLength = (int32_t*)p;
635✔
2297
  int32_t* colLength1 = (int32_t*)p1;
635✔
2298
  (void)memcpy(p1, p, len);
635✔
2299
  p += len;
635✔
2300
  p1 += len;
635✔
2301
  totalLen += len;
635✔
2302

2303
  char* pStart = p;
635✔
2304
  char* pStart1 = p1;
635✔
2305
  for (int32_t i = 0; i < numOfCols; ++i) {
2,895✔
2306
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,260!
2307
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
2,260!
2308
    if (colLen >= dataLen) {
2,260!
2309
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
UNCOV
2310
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2311
    }
2312
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,260✔
2313
      int32_t* offset = (int32_t*)pStart;
712✔
2314
      int32_t* offset1 = (int32_t*)pStart1;
712✔
2315
      len = numOfRows * sizeof(int32_t);
712✔
2316
      (void)memcpy(pStart1, pStart, len);
712✔
2317
      pStart += len;
712✔
2318
      pStart1 += len;
712✔
2319
      totalLen += len;
712✔
2320

2321
      len = 0;
712✔
2322
      for (int32_t j = 0; j < numOfRows; ++j) {
3,613✔
2323
        if (offset[j] == -1) {
2,901✔
2324
          continue;
223✔
2325
        }
2326
        char* data = offset[j] + pStart;
2,678✔
2327

2328
        int32_t jsonInnerType = *data;
2,678✔
2329
        char*   jsonInnerData = data + CHAR_BYTES;
2,678✔
2330
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
2,678✔
2331
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
2,678✔
2332
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
72✔
2333
          varDataSetLen(dst, strlen(varDataVal(dst)));
72✔
2334
        } else if (tTagIsJson(data)) {
2,606✔
2335
          char* jsonString = NULL;
844✔
2336
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
844✔
2337
          if (jsonString == NULL) {
844!
2338
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
UNCOV
2339
            return terrno;
×
2340
          }
2341
          STR_TO_VARSTR(dst, jsonString);
844✔
2342
          taosMemoryFree(jsonString);
844!
2343
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,762✔
2344
          *(char*)varDataVal(dst) = '\"';
1,466✔
2345
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
1,466✔
2346
                                         varDataVal(dst) + CHAR_BYTES, pResultInfo->charsetCxt);
2347
          if (length <= 0) {
1,466✔
2348
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
4!
2349
              pResultInfo->charsetCxt != NULL ? ((SConvInfo *)(pResultInfo->charsetCxt))->charset : tsCharset);
2350
            length = 0;
4✔
2351
          }
2352
          varDataSetLen(dst, length + CHAR_BYTES * 2);
1,466✔
2353
          *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
1,466✔
2354
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2355
          double jsonVd = *(double*)(jsonInnerData);
216✔
2356
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
216✔
2357
          varDataSetLen(dst, strlen(varDataVal(dst)));
216✔
2358
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2359
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
80✔
2360
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
80✔
2361
          varDataSetLen(dst, strlen(varDataVal(dst)));
80✔
2362
        } else {
2363
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
UNCOV
2364
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2365
        }
2366

2367
        offset1[j] = len;
2,678✔
2368
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
2,678✔
2369
        len += varDataTLen(dst);
2,678✔
2370
      }
2371
      colLen1 = len;
712✔
2372
      totalLen += colLen1;
712✔
2373
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
712!
2374
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,548!
2375
      len = numOfRows * sizeof(int32_t);
396✔
2376
      (void)memcpy(pStart1, pStart, len);
396✔
2377
      pStart += len;
396✔
2378
      pStart1 += len;
396✔
2379
      totalLen += len;
396✔
2380
      totalLen += colLen;
396✔
2381
      (void)memcpy(pStart1, pStart, colLen);
396✔
2382
    } else {
2383
      len = BitmapLen(pResultInfo->numOfRows);
1,152✔
2384
      (void)memcpy(pStart1, pStart, len);
1,152✔
2385
      pStart += len;
1,152✔
2386
      pStart1 += len;
1,152✔
2387
      totalLen += len;
1,152✔
2388
      totalLen += colLen;
1,152✔
2389
      (void)memcpy(pStart1, pStart, colLen);
1,152✔
2390
    }
2391
    pStart += colLen;
2,260✔
2392
    pStart1 += colLen1;
2,260✔
2393
  }
2394

2395
  // Ensure the complete structure of the block, including the blankfill field,
2396
  // even though it is not used on the client side.
2397
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2398
  totalLen += sizeof(bool);
635✔
2399

2400
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
635✔
2401
  pResultInfo->pData = pResultInfo->convertJson;
635✔
2402
  return TSDB_CODE_SUCCESS;
635✔
2403
}
2404

2405
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
1,574,682✔
2406
  bool convertForDecimal = convertUcs4;
1,574,682✔
2407
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
1,574,682!
2408
    tscError("setResultDataPtr paras error");
×
UNCOV
2409
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2410
  }
2411

2412
  if (pResultInfo->numOfRows == 0) {
1,574,687✔
2413
    return TSDB_CODE_SUCCESS;
70,908✔
2414
  }
2415

2416
  if (pResultInfo->pData == NULL) {
1,503,779!
2417
    tscError("setResultDataPtr error: pData is NULL");
×
UNCOV
2418
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2419
  }
2420

2421
  int32_t code = doPrepareResPtr(pResultInfo);
1,503,779✔
2422
  if (code != TSDB_CODE_SUCCESS) {
1,503,824!
UNCOV
2423
    return code;
×
2424
  }
2425
  code = doConvertJson(pResultInfo);
1,503,824✔
2426
  if (code != TSDB_CODE_SUCCESS) {
1,503,816!
UNCOV
2427
    return code;
×
2428
  }
2429

2430
  char* p = (char*)pResultInfo->pData;
1,503,816✔
2431

2432
  // version:
2433
  int32_t blockVersion = *(int32_t*)p;
1,503,816✔
2434
  p += sizeof(int32_t);
1,503,816✔
2435

2436
  int32_t dataLen = *(int32_t*)p;
1,503,816✔
2437
  p += sizeof(int32_t);
1,503,816✔
2438

2439
  int32_t rows = *(int32_t*)p;
1,503,816✔
2440
  p += sizeof(int32_t);
1,503,816✔
2441

2442
  int32_t cols = *(int32_t*)p;
1,503,816✔
2443
  p += sizeof(int32_t);
1,503,816✔
2444

2445
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
1,503,816!
UNCOV
2446
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
×
2447
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
UNCOV
2448
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2449
  }
2450

2451
  int32_t hasColumnSeg = *(int32_t*)p;
1,503,819✔
2452
  p += sizeof(int32_t);
1,503,819✔
2453

2454
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
1,503,819✔
2455
  p += sizeof(uint64_t);
1,503,819✔
2456

2457
  // check fields
2458
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
7,033,284✔
2459
    int8_t type = *(int8_t*)p;
5,529,448✔
2460
    p += sizeof(int8_t);
5,529,448✔
2461

2462
    int32_t bytes = *(int32_t*)p;
5,529,448✔
2463
    p += sizeof(int32_t);
5,529,448✔
2464

2465
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
5,529,448!
UNCOV
2466
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
×
2467
    }
2468
  }
2469

2470
  int32_t* colLength = (int32_t*)p;
1,503,836✔
2471
  p += sizeof(int32_t) * pResultInfo->numOfCols;
1,503,836✔
2472

2473
  char* pStart = p;
1,503,836✔
2474
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
7,033,047✔
2475
    if ((pStart - pResultInfo->pData) >= dataLen) {
5,529,243!
2476
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
UNCOV
2477
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2478
    }
2479
    if (blockVersion == BLOCK_VERSION_1) {
5,529,243✔
2480
      colLength[i] = htonl(colLength[i]);
3,623,277✔
2481
    }
2482
    if (colLength[i] >= dataLen) {
5,529,243!
2483
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
UNCOV
2484
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2485
    }
2486
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
5,529,243!
2487
      tscError("invalid type %d", pResultInfo->fields[i].type);
15!
UNCOV
2488
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2489
    }
2490
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
5,529,228✔
2491
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
1,350,846✔
2492
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
1,350,846✔
2493
    } else {
2494
      pResultInfo->pCol[i].nullbitmap = pStart;
4,178,382✔
2495
      pStart += BitmapLen(pResultInfo->numOfRows);
4,178,382✔
2496
    }
2497

2498
    pResultInfo->pCol[i].pData = pStart;
5,529,228✔
2499
    pResultInfo->length[i] = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
5,529,228✔
2500
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
5,529,211✔
2501

2502
    pStart += colLength[i];
5,529,211✔
2503
  }
2504

2505
  p = pStart;
1,503,804✔
2506
  // bool blankFill = *(bool*)p;
2507
  p += sizeof(bool);
1,503,804✔
2508
  int32_t offset = p - pResultInfo->pData;
1,503,804✔
2509
  if (offset > dataLen) {
1,503,804!
2510
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
UNCOV
2511
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2512
  }
2513

2514
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2515
  if (convertUcs4) {
1,503,804✔
2516
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
1,503,516✔
2517
  }
2518
#endif
2519
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
1,503,847✔
2520
    code = convertDecimalType(pResultInfo);
1,503,560✔
2521
  }
2522
  return code;
1,503,761✔
2523
}
2524

2525
char* getDbOfConnection(STscObj* pObj) {
11,320,188✔
2526
  terrno = TSDB_CODE_SUCCESS;
11,320,188✔
2527
  char* p = NULL;
11,323,200✔
2528
  (void)taosThreadMutexLock(&pObj->mutex);
11,323,200✔
2529
  size_t len = strlen(pObj->db);
11,332,956✔
2530
  if (len > 0) {
11,332,956✔
2531
    p = taosStrndup(pObj->db, tListLen(pObj->db));
10,801,886!
2532
    if (p == NULL) {
10,792,465!
UNCOV
2533
      tscError("failed to taosStrndup db name");
×
2534
    }
2535
  }
2536

2537
  (void)taosThreadMutexUnlock(&pObj->mutex);
11,323,535✔
2538
  return p;
11,328,933✔
2539
}
2540

2541
void setConnectionDB(STscObj* pTscObj, const char* db) {
8,855✔
2542
  if (db == NULL || pTscObj == NULL) {
8,855!
2543
    tscError("setConnectionDB para is NULL");
×
UNCOV
2544
    return;
×
2545
  }
2546

2547
  (void)taosThreadMutexLock(&pTscObj->mutex);
8,859✔
2548
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
8,864✔
2549
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
8,864✔
2550
}
2551

2552
void resetConnectDB(STscObj* pTscObj) {
×
2553
  if (pTscObj == NULL) {
×
UNCOV
2554
    return;
×
2555
  }
2556

2557
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2558
  pTscObj->db[0] = 0;
×
UNCOV
2559
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2560
}
2561

2562
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, bool isStmt) {
1,217,011✔
2563
  if (pResultInfo == NULL || pRsp == NULL) {
1,217,011!
2564
    tscError("setQueryResultFromRsp paras is null");
×
UNCOV
2565
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2566
  }
2567

2568
  taosMemoryFreeClear(pResultInfo->pRspMsg);
1,217,022!
2569
  pResultInfo->pRspMsg = (const char*)pRsp;
1,217,022✔
2570
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
1,217,022✔
2571
  pResultInfo->current = 0;
1,217,017✔
2572
  pResultInfo->completed = (pRsp->completed == 1);
1,217,017✔
2573
  pResultInfo->precision = pRsp->precision;
1,217,017✔
2574

2575
  // decompress data if needed
2576
  int32_t payloadLen = htonl(pRsp->payloadLen);
1,217,017✔
2577

2578
  if (pRsp->compressed) {
1,217,017✔
2579
    if (pResultInfo->decompBuf == NULL) {
645✔
2580
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
14!
2581
      if (pResultInfo->decompBuf == NULL) {
14!
2582
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2583
        return terrno;
×
2584
      }
2585
      pResultInfo->decompBufSize = payloadLen;
14✔
2586
    } else {
2587
      if (pResultInfo->decompBufSize < payloadLen) {
631✔
2588
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
16!
2589
        if (p == NULL) {
16!
2590
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2591
          return terrno;
×
2592
        }
2593

2594
        pResultInfo->decompBuf = p;
16✔
2595
        pResultInfo->decompBufSize = payloadLen;
16✔
2596
      }
2597
    }
2598
  }
2599

2600
  if (payloadLen > 0) {
1,217,017✔
2601
    int32_t compLen = *(int32_t*)pRsp->data;
1,146,104✔
2602
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
1,146,104✔
2603

2604
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
1,146,104✔
2605

2606
    if (pRsp->compressed && compLen < rawLen) {
1,146,104!
2607
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
645✔
2608
      if (len < 0) {
645!
2609
        tscError("tsDecompressString failed");
×
UNCOV
2610
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2611
      }
2612
      if (len != rawLen) {
645!
2613
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
UNCOV
2614
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2615
      }
2616
      pResultInfo->pData = pResultInfo->decompBuf;
645✔
2617
      pResultInfo->payloadLen = rawLen;
645✔
2618
    } else {
2619
      pResultInfo->pData = pStart;
1,145,459✔
2620
      pResultInfo->payloadLen = htonl(pRsp->compLen);
1,145,459✔
2621
      if (pRsp->compLen != pRsp->payloadLen) {
1,145,459!
2622
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
UNCOV
2623
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2624
      }
2625
    }
2626
  }
2627

2628
  // TODO handle the compressed case
2629
  pResultInfo->totalRows += pResultInfo->numOfRows;
1,217,017✔
2630

2631
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
1,217,017✔
2632
  return code;
1,217,018✔
2633
}
2634

2635
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
5✔
2636
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
5✔
2637
  void*              clientRpc = NULL;
5✔
2638
  SServerStatusRsp   statusRsp = {0};
5✔
2639
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
5✔
2640
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
5✔
2641
  SRpcMsg  rpcRsp = {0};
5✔
2642
  SRpcInit rpcInit = {0};
5✔
2643
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
5✔
2644

2645
  rpcInit.label = "CHK";
5✔
2646
  rpcInit.numOfThreads = 1;
5✔
2647
  rpcInit.cfp = NULL;
5✔
2648
  rpcInit.sessions = 16;
5✔
2649
  rpcInit.connType = TAOS_CONN_CLIENT;
5✔
2650
  rpcInit.idleTime = tsShellActivityTimer * 1000;
5✔
2651
  rpcInit.compressSize = tsCompressMsgSize;
5✔
2652
  rpcInit.user = "_dnd";
5✔
2653

2654
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
5✔
2655
  connLimitNum = TMAX(connLimitNum, 10);
5✔
2656
  connLimitNum = TMIN(connLimitNum, 500);
5✔
2657
  rpcInit.connLimitNum = connLimitNum;
5✔
2658
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
5✔
2659
  rpcInit.readTimeout = tsReadTimeout;
5✔
2660
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
5!
2661
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
UNCOV
2662
    goto _OVER;
×
2663
  }
2664

2665
  clientRpc = rpcOpen(&rpcInit);
5✔
2666
  if (clientRpc == NULL) {
5!
2667
    code = terrno;
×
2668
    tscError("failed to init server status client since %s", tstrerror(code));
×
UNCOV
2669
    goto _OVER;
×
2670
  }
2671

2672
  if (fqdn == NULL) {
5!
2673
    fqdn = tsLocalFqdn;
5✔
2674
  }
2675

2676
  if (port == 0) {
5!
2677
    port = tsServerPort;
5✔
2678
  }
2679

2680
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
5✔
2681
  epSet.eps[0].port = (uint16_t)port;
5✔
2682
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
5✔
2683
  if (TSDB_CODE_SUCCESS != ret) {
5!
2684
    tscError("failed to send recv since %s", tstrerror(ret));
×
UNCOV
2685
    goto _OVER;
×
2686
  }
2687

2688
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
5!
2689
    tscError("failed to send server status req since %s", terrstr());
1!
2690
    goto _OVER;
1✔
2691
  }
2692

2693
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
4!
2694
    tscError("failed to parse server status rsp since %s", terrstr());
×
UNCOV
2695
    goto _OVER;
×
2696
  }
2697

2698
  code = statusRsp.statusCode;
4✔
2699
  if (details != NULL) {
4!
2700
    tstrncpy(details, statusRsp.details, maxlen);
4✔
2701
  }
2702

UNCOV
2703
_OVER:
×
2704
  if (clientRpc != NULL) {
5!
2705
    rpcClose(clientRpc);
5✔
2706
  }
2707
  if (rpcRsp.pCont != NULL) {
5✔
2708
    rpcFreeCont(rpcRsp.pCont);
4✔
2709
  }
2710
  return code;
5✔
2711
}
2712

2713
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
4✔
2714
                      int32_t acctId, char* db) {
2715
  SName name = {0};
4✔
2716

2717
  if (len1 <= 0) {
4!
UNCOV
2718
    return -1;
×
2719
  }
2720

2721
  const char* dbName = db;
4✔
2722
  const char* tbName = NULL;
4✔
2723
  int32_t     dbLen = 0;
4✔
2724
  int32_t     tbLen = 0;
4✔
2725
  if (len2 > 0) {
4!
2726
    dbName = str + pos1;
×
2727
    dbLen = len1;
×
2728
    tbName = str + pos2;
×
UNCOV
2729
    tbLen = len2;
×
2730
  } else {
2731
    dbLen = strlen(db);
4✔
2732
    tbName = str + pos1;
4✔
2733
    tbLen = len1;
4✔
2734
  }
2735

2736
  if (dbLen <= 0 || tbLen <= 0) {
4!
UNCOV
2737
    return -1;
×
2738
  }
2739

2740
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
4!
UNCOV
2741
    return -1;
×
2742
  }
2743

2744
  if (tNameAddTbName(&name, tbName, tbLen)) {
4!
UNCOV
2745
    return -1;
×
2746
  }
2747

2748
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
4✔
2749
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
4✔
2750

2751
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
4✔
2752
  if (pDb) {
4!
2753
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
UNCOV
2754
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2755
    }
2756
  } else {
2757
    STablesReq db;
2758
    db.pTables = taosArrayInit(20, sizeof(SName));
4✔
2759
    if (NULL == db.pTables) {
4!
UNCOV
2760
      return terrno;
×
2761
    }
2762
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
4✔
2763
    if (NULL == taosArrayPush(db.pTables, &name)) {
8!
UNCOV
2764
      return terrno;
×
2765
    }
2766
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
4!
2767
  }
2768

2769
  return TSDB_CODE_SUCCESS;
4✔
2770
}
2771

2772
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
4✔
2773
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
4✔
2774
  if (NULL == pHash) {
4!
UNCOV
2775
    return terrno;
×
2776
  }
2777

2778
  bool    inEscape = false;
4✔
2779
  int32_t code = 0;
4✔
2780
  void*   pIter = NULL;
4✔
2781

2782
  int32_t vIdx = 0;
4✔
2783
  int32_t vPos[2];
2784
  int32_t vLen[2];
2785

2786
  (void)memset(vPos, -1, sizeof(vPos));
4✔
2787
  (void)memset(vLen, 0, sizeof(vLen));
4✔
2788

2789
  for (int32_t i = 0;; ++i) {
18✔
2790
    if (0 == *(tbList + i)) {
18✔
2791
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
4!
2792
        vLen[vIdx] = i - vPos[vIdx];
4✔
2793
      }
2794

2795
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
4✔
2796
      if (code) {
4!
UNCOV
2797
        goto _return;
×
2798
      }
2799

2800
      break;
4✔
2801
    }
2802

2803
    if ('`' == *(tbList + i)) {
14!
2804
      inEscape = !inEscape;
×
2805
      if (!inEscape) {
×
2806
        if (vPos[vIdx] >= 0) {
×
UNCOV
2807
          vLen[vIdx] = i - vPos[vIdx];
×
2808
        } else {
UNCOV
2809
          goto _return;
×
2810
        }
2811
      }
2812

UNCOV
2813
      continue;
×
2814
    }
2815

2816
    if (inEscape) {
14!
2817
      if (vPos[vIdx] < 0) {
×
UNCOV
2818
        vPos[vIdx] = i;
×
2819
      }
UNCOV
2820
      continue;
×
2821
    }
2822

2823
    if ('.' == *(tbList + i)) {
14!
2824
      if (vPos[vIdx] < 0) {
×
UNCOV
2825
        goto _return;
×
2826
      }
2827
      if (vLen[vIdx] <= 0) {
×
UNCOV
2828
        vLen[vIdx] = i - vPos[vIdx];
×
2829
      }
2830
      vIdx++;
×
2831
      if (vIdx >= 2) {
×
UNCOV
2832
        goto _return;
×
2833
      }
UNCOV
2834
      continue;
×
2835
    }
2836

2837
    if (',' == *(tbList + i)) {
14!
2838
      if (vPos[vIdx] < 0) {
×
UNCOV
2839
        goto _return;
×
2840
      }
2841
      if (vLen[vIdx] <= 0) {
×
UNCOV
2842
        vLen[vIdx] = i - vPos[vIdx];
×
2843
      }
2844

2845
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
2846
      if (code) {
×
UNCOV
2847
        goto _return;
×
2848
      }
2849

2850
      (void)memset(vPos, -1, sizeof(vPos));
×
2851
      (void)memset(vLen, 0, sizeof(vLen));
×
2852
      vIdx = 0;
×
UNCOV
2853
      continue;
×
2854
    }
2855

2856
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
14!
2857
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
UNCOV
2858
        vLen[vIdx] = i - vPos[vIdx];
×
2859
      }
UNCOV
2860
      continue;
×
2861
    }
2862

2863
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
14!
2864
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
1!
2865
      if (vLen[vIdx] > 0) {
14!
UNCOV
2866
        goto _return;
×
2867
      }
2868
      if (vPos[vIdx] < 0) {
14✔
2869
        vPos[vIdx] = i;
4✔
2870
      }
2871
      continue;
14✔
2872
    }
2873

UNCOV
2874
    goto _return;
×
2875
  }
2876

2877
  int32_t dbNum = taosHashGetSize(pHash);
4✔
2878
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
4✔
2879
  if (NULL == pReq) {
4!
UNCOV
2880
    TSC_ERR_JRET(terrno);
×
2881
  }
2882
  pIter = taosHashIterate(pHash, NULL);
4✔
2883
  while (pIter) {
8✔
2884
    STablesReq* pDb = (STablesReq*)pIter;
4✔
2885
    if (NULL == taosArrayPush(*pReq, pDb)) {
8!
UNCOV
2886
      TSC_ERR_JRET(terrno);
×
2887
    }
2888
    pIter = taosHashIterate(pHash, pIter);
4✔
2889
  }
2890

2891
  taosHashCleanup(pHash);
4✔
2892

2893
  return TSDB_CODE_SUCCESS;
4✔
2894

UNCOV
2895
_return:
×
2896

UNCOV
2897
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
2898

2899
  pIter = taosHashIterate(pHash, NULL);
×
2900
  while (pIter) {
×
2901
    STablesReq* pDb = (STablesReq*)pIter;
×
2902
    taosArrayDestroy(pDb->pTables);
×
UNCOV
2903
    pIter = taosHashIterate(pHash, pIter);
×
2904
  }
2905

UNCOV
2906
  taosHashCleanup(pHash);
×
2907

UNCOV
2908
  return terrno;
×
2909
}
2910

2911
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
4✔
2912
  SSyncQueryParam* pParam = param;
4✔
2913
  pParam->pRequest->code = code;
4✔
2914

2915
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
4!
UNCOV
2916
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2917
  }
2918
}
4✔
2919

2920
void syncQueryFn(void* param, void* res, int32_t code) {
11,213,421✔
2921
  SSyncQueryParam* pParam = param;
11,213,421✔
2922
  pParam->pRequest = res;
11,213,421✔
2923

2924
  if (pParam->pRequest) {
11,213,421✔
2925
    pParam->pRequest->code = code;
11,211,913✔
2926
    clientOperateReport(pParam->pRequest);
11,211,913✔
2927
  }
2928

2929
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
11,214,738!
UNCOV
2930
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2931
  }
2932
}
11,219,537✔
2933

2934
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
11,202,177✔
2935
                        int8_t source) {
2936
  if (sql == NULL || NULL == fp) {
11,202,177!
2937
    terrno = TSDB_CODE_INVALID_PARA;
×
2938
    if (fp) {
×
UNCOV
2939
      fp(param, NULL, terrno);
×
2940
    }
2941

2942
    return;
1✔
2943
  }
2944

2945
  size_t sqlLen = strlen(sql);
11,211,617✔
2946
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
11,211,617✔
2947
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, TSDB_MAX_ALLOWED_SQL_LEN);
1!
2948
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1✔
2949
    fp(param, NULL, terrno);
1✔
2950
    return;
1✔
2951
  }
2952

2953
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
11,211,616✔
2954

2955
  SRequestObj* pRequest = NULL;
11,211,617✔
2956
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
11,211,617✔
2957
  if (code != TSDB_CODE_SUCCESS) {
11,203,991!
2958
    terrno = code;
×
2959
    fp(param, NULL, terrno);
×
UNCOV
2960
    return;
×
2961
  }
2962

2963
  pRequest->source = source;
11,203,991✔
2964
  pRequest->body.queryFp = fp;
11,203,991✔
2965
  doAsyncQuery(pRequest, false);
11,203,991✔
2966
}
2967

UNCOV
2968
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
×
2969
                                 int64_t reqid) {
2970
  if (sql == NULL || NULL == fp) {
×
2971
    terrno = TSDB_CODE_INVALID_PARA;
×
2972
    if (fp) {
×
UNCOV
2973
      fp(param, NULL, terrno);
×
2974
    }
2975

UNCOV
2976
    return;
×
2977
  }
2978

2979
  size_t sqlLen = strlen(sql);
×
2980
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
×
UNCOV
2981
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid,
×
2982
             TSDB_MAX_ALLOWED_SQL_LEN);
2983
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
2984
    fp(param, NULL, terrno);
×
UNCOV
2985
    return;
×
2986
  }
2987

UNCOV
2988
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
×
2989

2990
  SRequestObj* pRequest = NULL;
×
2991
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
×
2992
  if (code != TSDB_CODE_SUCCESS) {
×
2993
    terrno = code;
×
2994
    fp(param, NULL, terrno);
×
UNCOV
2995
    return;
×
2996
  }
2997

2998
  pRequest->body.queryFp = fp;
×
UNCOV
2999
  doAsyncQuery(pRequest, false);
×
3000
}
3001

3002
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
11,200,166✔
3003
  if (NULL == taos) {
11,200,166!
3004
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3005
    return NULL;
×
3006
  }
3007

3008
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
11,200,166!
3009
  if (NULL == param) {
11,210,343!
UNCOV
3010
    return NULL;
×
3011
  }
3012
  int32_t code = tsem_init(&param->sem, 0, 0);
11,210,343✔
3013
  if (TSDB_CODE_SUCCESS != code) {
11,209,777!
3014
    taosMemoryFree(param);
×
UNCOV
3015
    return NULL;
×
3016
  }
3017

3018
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
11,209,777✔
3019
  code = tsem_wait(&param->sem);
11,157,495✔
3020
  if (TSDB_CODE_SUCCESS != code) {
11,216,561!
3021
    taosMemoryFree(param);
×
UNCOV
3022
    return NULL;
×
3023
  }
3024
  code = tsem_destroy(&param->sem);
11,216,561✔
3025
  if (TSDB_CODE_SUCCESS != code) {
11,214,192!
UNCOV
3026
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3027
  }
3028

3029
  SRequestObj* pRequest = NULL;
11,213,366✔
3030
  if (param->pRequest != NULL) {
11,213,366✔
3031
    param->pRequest->syncQuery = true;
11,213,365✔
3032
    pRequest = param->pRequest;
11,213,365✔
3033
    param->pRequest->inCallback = false;
11,213,365✔
3034
  }
3035
  taosMemoryFree(param);
11,213,366!
3036

3037
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3038
  //          *(int64_t*)taos, pRequest);
3039

3040
  return pRequest;
11,207,761✔
3041
}
3042

3043
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
×
3044
  if (NULL == taos) {
×
3045
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3046
    return NULL;
×
3047
  }
3048

3049
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
×
3050
  if (param == NULL) {
×
UNCOV
3051
    return NULL;
×
3052
  }
3053
  int32_t code = tsem_init(&param->sem, 0, 0);
×
3054
  if (TSDB_CODE_SUCCESS != code) {
×
3055
    taosMemoryFree(param);
×
UNCOV
3056
    return NULL;
×
3057
  }
3058

3059
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
×
3060
  code = tsem_wait(&param->sem);
×
3061
  if (TSDB_CODE_SUCCESS != code) {
×
3062
    taosMemoryFree(param);
×
UNCOV
3063
    return NULL;
×
3064
  }
3065
  SRequestObj* pRequest = NULL;
×
3066
  if (param->pRequest != NULL) {
×
3067
    param->pRequest->syncQuery = true;
×
UNCOV
3068
    pRequest = param->pRequest;
×
3069
  }
UNCOV
3070
  taosMemoryFree(param);
×
3071

3072
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3073
  //   *(int64_t*)taos, pRequest);
3074

UNCOV
3075
  return pRequest;
×
3076
}
3077

3078
static void fetchCallback(void* pResult, void* param, int32_t code) {
1,093,124✔
3079
  SRequestObj* pRequest = (SRequestObj*)param;
1,093,124✔
3080

3081
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,093,124✔
3082

3083
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
1,093,124✔
3084
           pRequest->requestId);
3085

3086
  pResultInfo->pData = pResult;
1,093,104✔
3087
  pResultInfo->numOfRows = 0;
1,093,104✔
3088

3089
  if (code != TSDB_CODE_SUCCESS) {
1,093,104!
3090
    pRequest->code = code;
×
3091
    taosMemoryFreeClear(pResultInfo->pData);
×
3092
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
UNCOV
3093
    return;
×
3094
  }
3095

3096
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,093,104!
3097
    taosMemoryFreeClear(pResultInfo->pData);
×
3098
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
UNCOV
3099
    return;
×
3100
  }
3101

3102
  pRequest->code =
1,093,126✔
3103
      setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, pRequest->isStmtBind);
1,093,104✔
3104
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,093,126✔
3105
    pResultInfo->numOfRows = 0;
1✔
3106
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(pRequest->code),
1!
3107
             pRequest->requestId);
3108
  } else {
3109
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
1,093,125✔
3110
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
3111
             pRequest->requestId);
3112

3113
    STscObj*            pTscObj = pRequest->pTscObj;
1,093,127✔
3114
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,093,127✔
3115
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
1,093,127✔
3116
  }
3117

3118
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
1,093,149✔
3119
}
3120

3121
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
1,178,731✔
3122
  pRequest->body.fetchFp = fp;
1,178,731✔
3123
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
1,178,731✔
3124

3125
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,178,731✔
3126

3127
  // this query has no results or error exists, return directly
3128
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,178,731!
3129
    pResultInfo->numOfRows = 0;
1✔
3130
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
1✔
3131
    return;
85,583✔
3132
  }
3133

3134
  // all data has returned to App already, no need to try again
3135
  if (pResultInfo->completed) {
1,178,723✔
3136
    // it is a local executed query, no need to do async fetch
3137
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
85,582✔
3138
      if (pResultInfo->localResultFetched) {
2,732✔
3139
        pResultInfo->numOfRows = 0;
1,366✔
3140
        pResultInfo->current = 0;
1,366✔
3141
      } else {
3142
        pResultInfo->localResultFetched = true;
1,366✔
3143
      }
3144
    } else {
3145
      pResultInfo->numOfRows = 0;
82,850✔
3146
    }
3147

3148
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
85,582✔
3149
    return;
85,582✔
3150
  }
3151

3152
  SSchedulerReq req = {
1,093,141✔
3153
      .syncReq = false,
3154
      .fetchFp = fetchCallback,
3155
      .cbParam = pRequest,
3156
  };
3157

3158
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
1,093,141✔
3159
  if (TSDB_CODE_SUCCESS != code) {
1,093,142!
UNCOV
3160
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3161
    // pRequest->body.fetchFp(param, pRequest, code);
3162
  }
3163
}
3164

3165
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
11,210,589✔
3166
  pRequest->inCallback = true;
11,210,589✔
3167
  int64_t this = pRequest->self;
11,210,589✔
3168
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
11,210,589!
3169
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
316!
3170
    code = TSDB_CODE_SUCCESS;
×
UNCOV
3171
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3172
  }
3173

3174
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
11,210,589✔
3175
           pRequest);
3176

3177
  if (pRequest->body.queryFp != NULL) {
11,210,590!
3178
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
11,211,584✔
3179
  }
3180

3181
  SRequestObj* pReq = acquireRequest(this);
11,217,965✔
3182
  if (pReq != NULL) {
11,214,985✔
3183
    pReq->inCallback = false;
11,211,839✔
3184
    (void)releaseRequest(this);
11,211,839✔
3185
  }
3186
}
11,209,337✔
3187

3188
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
1,204✔
3189
                       SParseSqlRes* pRes) {
3190
#ifndef TD_ENTERPRISE
3191
  return TSDB_CODE_SUCCESS;
3192
#else
3193
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
1,204✔
3194
#endif
3195
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc