• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.14
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "scheduler.h"
22
#include "tdatablock.h"
23
#include "tdataformat.h"
24
#include "tdef.h"
25
#include "tglobal.h"
26
#include "tmsgtype.h"
27
#include "tpagedbuf.h"
28
#include "tref.h"
29
#include "tsched.h"
30
#include "tversion.h"
31
#include "decimal.h"
32

33
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
34
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
35

36
void setQueryRequest(int64_t rId) {
1,426,024✔
37
  SRequestObj* pReq = acquireRequest(rId);
1,426,024✔
38
  if (pReq != NULL) {
1,426,034✔
39
    pReq->isQuery = true;
1,426,032✔
40
    (void)releaseRequest(rId);
1,426,032✔
41
  }
42
}
1,426,024✔
43

44
static bool stringLengthCheck(const char* str, size_t maxsize) {
56,679✔
45
  if (str == NULL) {
56,679!
UNCOV
46
    return false;
×
47
  }
48

49
  size_t len = strlen(str);
56,679✔
50
  if (len <= 0 || len > maxsize) {
56,679!
UNCOV
51
    return false;
×
52
  }
53

54
  return true;
56,687✔
55
}
56

57
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
27,943✔
58

59
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
27,940✔
60

61
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
796✔
62

63
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
27,944✔
64
  char key[512] = {0};
27,944✔
65
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
27,944✔
66
  return taosStrdup(key);
27,944!
67
}
68

69
bool chkRequestKilled(void* param) {
34,795,528✔
70
  bool         killed = false;
34,795,528✔
71
  SRequestObj* pRequest = acquireRequest((int64_t)param);
34,795,528✔
72
  if (NULL == pRequest || pRequest->killed) {
35,042,263!
UNCOV
73
    killed = true;
×
74
  }
75

76
  (void)releaseRequest((int64_t)param);
35,042,263✔
77

78
  return killed;
34,987,470✔
79
}
80

81
void cleanupAppInfo() {
15,719✔
82
  taosHashCleanup(appInfo.pInstMap);
15,719✔
83
  taosHashCleanup(appInfo.pInstMapByClusterId);
15,719✔
84
}
15,719✔
85

86
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
87
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
88

89
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
27,944✔
90
                              uint16_t port, int connType, STscObj** pObj) {
91
  TSC_ERR_RET(taos_init());
27,944!
92
  if (!validateUserName(user)) {
27,945!
UNCOV
93
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
94
  }
95

96
  char localDb[TSDB_DB_NAME_LEN] = {0};
27,945✔
97
  if (db != NULL && strlen(db) > 0) {
27,945!
98
    if (!validateDbName(db)) {
796!
UNCOV
99
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
100
    }
101

102
    tstrncpy(localDb, db, sizeof(localDb));
796✔
103
    (void)strdequote(localDb);
796✔
104
  }
105

106
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
27,944✔
107
  if (auth == NULL) {
27,944✔
108
    if (!validatePassword(pass)) {
27,940!
UNCOV
109
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
110
    }
111

112
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
27,945✔
113
  } else {
114
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
4✔
115
  }
116

117
  SCorEpSet epSet = {0};
27,942✔
118
  if (ip) {
27,942✔
119
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
12,001✔
120
  } else {
121
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
15,941!
122
  }
123

124
  if (port) {
27,945✔
125
    epSet.epSet.eps[0].port = port;
9,224✔
126
    epSet.epSet.eps[1].port = port;
9,224✔
127
  }
128

129
  char* key = getClusterKey(user, secretEncrypt, ip, port);
27,945✔
130
  if (NULL == key) {
27,945!
UNCOV
131
    TSC_ERR_RET(terrno);
×
132
  }
133
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
27,945!
134
          user, db, key);
135
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
71,838✔
136
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
43,893!
137
  }
138

139
  SAppInstInfo** pInst = NULL;
27,945✔
140
  int32_t        code = taosThreadMutexLock(&appInfo.mutex);
27,945✔
141
  if (TSDB_CODE_SUCCESS != code) {
27,945!
UNCOV
142
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
UNCOV
143
    TSC_ERR_RET(code);
×
144
  }
145

146
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
27,945✔
147
  SAppInstInfo* p = NULL;
27,945✔
148
  if (pInst == NULL) {
27,945✔
149
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
15,956!
150
    if (NULL == p) {
15,956!
UNCOV
151
      TSC_ERR_JRET(terrno);
×
152
    }
153
    p->mgmtEp = epSet;
15,956✔
154
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
15,956✔
155
    if (TSDB_CODE_SUCCESS != code) {
15,956!
UNCOV
156
      taosMemoryFree(p);
×
UNCOV
157
      TSC_ERR_JRET(code);
×
158
    }
159
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
15,956✔
160
    if (TSDB_CODE_SUCCESS != code) {
15,956!
UNCOV
161
      taosMemoryFree(p);
×
UNCOV
162
      TSC_ERR_JRET(code);
×
163
    }
164
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
15,956✔
165
    if (TSDB_CODE_SUCCESS != code) {
15,956!
UNCOV
166
      destroyAppInst(&p);
×
UNCOV
167
      TSC_ERR_JRET(code);
×
168
    }
169
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
15,956✔
170
    if (TSDB_CODE_SUCCESS != code) {
15,956!
UNCOV
171
      destroyAppInst(&p);
×
UNCOV
172
      TSC_ERR_JRET(code);
×
173
    }
174
    p->instKey = key;
15,956✔
175
    key = NULL;
15,956✔
176
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
15,956!
177

178
    pInst = &p;
15,956✔
179
  } else {
180
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
11,989!
UNCOV
181
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
UNCOV
182
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
183
    }
184
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
185
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
11,989✔
186
  }
187

188
_return:
27,945✔
189

190
  if (TSDB_CODE_SUCCESS != code) {
27,945!
191
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
UNCOV
192
    taosMemoryFreeClear(key);
×
UNCOV
193
    return code;
×
194
  } else {
195
    code = taosThreadMutexUnlock(&appInfo.mutex);
27,945✔
196
    taosMemoryFreeClear(key);
27,945!
197
    if (TSDB_CODE_SUCCESS != code) {
27,945!
UNCOV
198
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
UNCOV
199
      return code;
×
200
    }
201
    return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
27,945✔
202
  }
203
}
204

205
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
206
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
207
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
208
//     return *ppAppInstInfo;
209
//   } else {
210
//     return NULL;
211
//   }
212
// }
213

214
void freeQueryParam(SSyncQueryParam* param) {
915✔
215
  if (param == NULL) return;
915!
216
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
915!
UNCOV
217
    tscError("failed to destroy semaphore in freeQueryParam");
×
218
  }
219
  taosMemoryFree(param);
915!
220
}
221

222
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
10,748,506✔
223
                     SRequestObj** pRequest, int64_t reqid) {
224
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
10,748,506✔
225
  if (TSDB_CODE_SUCCESS != code) {
10,760,456!
UNCOV
226
    tscError("failed to malloc sqlObj, %s", sql);
×
UNCOV
227
    return code;
×
228
  }
229

230
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
10,760,456!
231
  if ((*pRequest)->sqlstr == NULL) {
10,751,866!
232
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
233
    destroyRequest(*pRequest);
×
UNCOV
234
    *pRequest = NULL;
×
UNCOV
235
    return terrno;
×
236
  }
237

238
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
10,751,866✔
239
  (*pRequest)->sqlstr[sqlLen] = 0;
10,760,364✔
240
  (*pRequest)->sqlLen = sqlLen;
10,760,364✔
241
  (*pRequest)->validateOnly = validateSql;
10,760,364✔
242
  (*pRequest)->isStmtBind = false;
10,760,364✔
243

244
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
10,760,364✔
245

246
  STscObj* pTscObj = (*pRequest)->pTscObj;
10,760,364✔
247
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
10,760,364✔
248
                             sizeof((*pRequest)->self));
249
  if (err) {
10,754,508!
250
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", connObj:%" PRId64 ", %s",
×
251
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
252
    destroyRequest(*pRequest);
×
UNCOV
253
    *pRequest = NULL;
×
UNCOV
254
    return terrno;
×
255
  }
256

257
  (*pRequest)->allocatorRefId = -1;
10,754,508✔
258
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
10,754,508!
259
    if (TSDB_CODE_SUCCESS !=
1,171,392!
260
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
1,171,375✔
261
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", connObj:%" PRId64 ", %s", (*pRequest)->self,
×
262
               (*pRequest)->requestId, pTscObj->id, sql);
263
      destroyRequest(*pRequest);
×
UNCOV
264
      *pRequest = NULL;
×
UNCOV
265
      return terrno;
×
266
    }
267
  }
268

269
  tscDebugL("req:0x%" PRIx64 ", QID:0x%" PRIx64 ", build request", (*pRequest)->self, (*pRequest)->requestId);
10,756,162✔
270
  return TSDB_CODE_SUCCESS;
10,753,660✔
271
}
272

273
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
514✔
274
  int32_t code =
275
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
514✔
276
  if (TSDB_CODE_SUCCESS == code) {
514!
277
    pRequest->relation.prevRefId = (*pNewRequest)->self;
514✔
278
    (*pNewRequest)->relation.nextRefId = pRequest->self;
514✔
279
    (*pNewRequest)->relation.userRefId = pRequest->self;
514✔
280
    (*pNewRequest)->isSubReq = true;
514✔
281
  }
282
  return code;
514✔
283
}
284

285
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
10,444✔
286
  STscObj* pTscObj = pRequest->pTscObj;
10,444✔
287

288
  SParseContext cxt = {.requestId = pRequest->requestId,
10,444✔
289
                       .requestRid = pRequest->self,
10,444✔
290
                       .acctId = pTscObj->acctId,
10,444✔
291
                       .db = pRequest->pDb,
10,444✔
292
                       .topicQuery = topicQuery,
293
                       .pSql = pRequest->sqlstr,
10,444✔
294
                       .sqlLen = pRequest->sqlLen,
10,444✔
295
                       .pMsg = pRequest->msgBuf,
10,444✔
296
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
297
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
10,444✔
298
                       .pStmtCb = pStmtCb,
299
                       .pUser = pTscObj->user,
10,444✔
300
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
10,444✔
301
                       .enableSysInfo = pTscObj->sysInfo,
10,444✔
302
                       .svrVer = pTscObj->sVer,
10,444✔
303
                       .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
10,444✔
304
                       .isStmtBind = pRequest->isStmtBind,
10,444✔
305
                       .setQueryFp = setQueryRequest,
306
                       .timezone = pTscObj->optionInfo.timezone,
10,444✔
307
                       .charsetCxt = pTscObj->optionInfo.charsetCxt,};
10,444✔
308

309
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
10,444✔
310
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
10,452✔
311
  if (code != TSDB_CODE_SUCCESS) {
10,451!
UNCOV
312
    return code;
×
313
  }
314

315
  code = qParseSql(&cxt, pQuery);
10,451✔
316
  if (TSDB_CODE_SUCCESS == code) {
10,450✔
317
    if ((*pQuery)->haveResultSet) {
10,441!
UNCOV
318
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols, (*pQuery)->pResExtSchema, pRequest->isStmtBind);
×
UNCOV
319
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
320
    }
321
  }
322

323
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
10,449!
324
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
10,440✔
325
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
10,440✔
326
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
10,440✔
327
  }
328

329
  taosArrayDestroy(cxt.pTableMetaPos);
10,449✔
330
  taosArrayDestroy(cxt.pTableVgroupPos);
10,449✔
331

332
  return code;
10,450✔
333
}
334

335
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
336
  SRetrieveTableRsp* pRsp = NULL;
×
337
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
338
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode, pRequest->pTscObj->optionInfo.charsetCxt);
×
UNCOV
339
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
UNCOV
340
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
×
341
  }
342

UNCOV
343
  return code;
×
344
}
345

346
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
542✔
347
  // drop table if exists not_exists_table
348
  if (NULL == pQuery->pCmdMsg) {
542!
UNCOV
349
    return TSDB_CODE_SUCCESS;
×
350
  }
351

352
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
542✔
353
  pRequest->type = pMsgInfo->msgType;
542✔
354
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
542✔
355
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
542✔
356

357
  STscObj*      pTscObj = pRequest->pTscObj;
542✔
358
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
542✔
359

360
  // int64_t transporterId = 0;
361
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
541!
362
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
543!
363
  return TSDB_CODE_SUCCESS;
543✔
364
}
365

366
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
20,739,906✔
367

368
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
118,930✔
369
  SRetrieveTableRsp* pRsp = NULL;
118,930✔
370
  if (pRequest->validateOnly) {
118,930✔
371
    doRequestCallback(pRequest, 0);
27✔
372
    return;
27✔
373
  }
374

375
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
118,903✔
376
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
118,903✔
377
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
118,903✔
378
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
97,091✔
379
  }
380

381
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
118,903✔
382
  pRequest->code = code;
118,903✔
383

384
  if (pRequest->code != TSDB_CODE_SUCCESS) {
118,903✔
385
    pResultInfo->numOfRows = 0;
3✔
386
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
3!
387
             pRequest->requestId);
388
  } else {
389
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
118,900✔
390
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
391
             pRequest->requestId);
392
  }
393

394
  doRequestCallback(pRequest, code);
118,903✔
395
}
396

397
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
33,487✔
398
  if (pRequest->validateOnly) {
33,487!
UNCOV
399
    doRequestCallback(pRequest, 0);
×
UNCOV
400
    return TSDB_CODE_SUCCESS;
×
401
  }
402

403
  // drop table if exists not_exists_table
404
  if (NULL == pQuery->pCmdMsg) {
33,487✔
405
    doRequestCallback(pRequest, 0);
1✔
406
    return TSDB_CODE_SUCCESS;
1✔
407
  }
408

409
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
33,486✔
410
  pRequest->type = pMsgInfo->msgType;
33,486✔
411
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
33,486✔
412
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
33,486✔
413

414
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
33,486✔
415
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
33,486✔
416

417
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
33,485✔
418
  if (code) {
33,486!
UNCOV
419
    doRequestCallback(pRequest, code);
×
420
  }
421
  return code;
33,486✔
422
}
423

424
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
214,579✔
425
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
214,579✔
426
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
214,579✔
427

428
  if (node1->load < node2->load) {
214,579!
UNCOV
429
    return -1;
×
430
  }
431

432
  return node1->load > node2->load;
214,579✔
433
}
434

435
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
77,518✔
436
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
77,518!
437
  if (pInfo->pQnodeList) {
77,518✔
438
    taosArrayDestroy(pInfo->pQnodeList);
76,886✔
439
    pInfo->pQnodeList = NULL;
76,886✔
440
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
76,886✔
441
  }
442

443
  if (pNodeList) {
77,518!
444
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
77,518✔
445
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
77,518✔
446
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
77,518✔
447
             taosArrayGetSize(pInfo->pQnodeList));
448
  }
449
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
77,518!
450

451
  return TSDB_CODE_SUCCESS;
77,518✔
452
}
453

454
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
10,723,790✔
455
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
10,723,790✔
456
    *required = false;
10,097,914✔
457
    return TSDB_CODE_SUCCESS;
10,097,914✔
458
  }
459

460
  int32_t       code = TSDB_CODE_SUCCESS;
625,876✔
461
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
625,876✔
462
  *required = false;
625,876✔
463

464
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
625,876!
465
  *required = (NULL == pInfo->pQnodeList);
625,876✔
466
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
625,876!
467
  return TSDB_CODE_SUCCESS;
625,876✔
468
}
469

470
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
UNCOV
471
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
472
  int32_t       code = 0;
×
473

474
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
UNCOV
475
  if (pInfo->pQnodeList) {
×
476
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
477
  }
478
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
479
  if (NULL == *pNodeList) {
×
480
    SCatalog* pCatalog = NULL;
×
481
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
482
    if (TSDB_CODE_SUCCESS == code) {
×
483
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
UNCOV
484
      if (NULL == pNodeList) {
×
485
        TSC_ERR_RET(terrno);
×
486
      }
487
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
488
                               .requestId = pRequest->requestId,
×
489
                               .requestObjRefId = pRequest->self,
×
UNCOV
490
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
UNCOV
491
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
492
    }
493

UNCOV
494
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
UNCOV
495
      code = updateQnodeList(pInfo, *pNodeList);
×
496
    }
497
  }
498

UNCOV
499
  return code;
×
500
}
501

502
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
34,103✔
503
  pRequest->type = pQuery->msgType;
34,103✔
504
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
34,103✔
505

506
  SPlanContext cxt = {.queryId = pRequest->requestId,
68,224✔
507
                      .acctId = pRequest->pTscObj->acctId,
34,103✔
508
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
34,103✔
509
                      .pAstRoot = pQuery->pRoot,
34,121✔
510
                      .showRewrite = pQuery->showRewrite,
34,121✔
511
                      .pMsg = pRequest->msgBuf,
34,121✔
512
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
513
                      .pUser = pRequest->pTscObj->user,
34,121✔
514
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
34,121✔
515
                      .sysInfo = pRequest->pTscObj->sysInfo};
34,121✔
516

517
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
34,121✔
518
}
519

520
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols, const SExtSchema* pExtSchema, bool isStmt) {
1,220,304✔
521
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
1,220,304!
UNCOV
522
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
UNCOV
523
    return TSDB_CODE_INVALID_PARA;
×
524
  }
525

526
  pResInfo->numOfCols = numOfCols;
1,220,336✔
527
  if (pResInfo->fields != NULL) {
1,220,336✔
528
    taosMemoryFree(pResInfo->fields);
41!
529
  }
530
  if (pResInfo->userFields != NULL) {
1,220,336✔
531
    taosMemoryFree(pResInfo->userFields);
41!
532
  }
533
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
1,220,336!
534
  if (NULL == pResInfo->fields) return terrno;
1,220,324!
535
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,220,324!
536
  if (NULL == pResInfo->userFields) {
1,220,292!
UNCOV
537
    taosMemoryFree(pResInfo->fields);
×
UNCOV
538
    return terrno;
×
539
  }
540
  if (numOfCols != pResInfo->numOfCols) {
1,220,292!
UNCOV
541
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
UNCOV
542
    return TSDB_CODE_FAILED;
×
543
  }
544

545
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
5,044,959✔
546
    pResInfo->fields[i].type = pSchema[i].type;
3,824,646✔
547

548
    pResInfo->userFields[i].type = pSchema[i].type;
3,824,646✔
549
    // userFields must convert to type bytes, no matter isStmt or not
550
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
3,824,646✔
551
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
3,824,673✔
552
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
3,824,668!
553
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
7,969✔
554
    }
555

556
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
3,824,667✔
557
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
3,824,667✔
558
  }
559
  return TSDB_CODE_SUCCESS;
1,220,313✔
560
}
561

562
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
886,122✔
563
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
886,122!
564
      precision != TSDB_TIME_PRECISION_NANO) {
UNCOV
565
    return;
×
566
  }
567

568
  pResInfo->precision = precision;
886,122✔
569
}
570

571
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
603,889✔
572
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
603,889✔
573
  if (NULL == nodeList) {
603,945!
UNCOV
574
    return terrno;
×
575
  }
576
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
603,952✔
577

578
  int32_t dbNum = taosArrayGetSize(pDbVgList);
603,952✔
579
  for (int32_t i = 0; i < dbNum; ++i) {
1,184,100✔
580
    SArray* pVg = taosArrayGetP(pDbVgList, i);
580,192✔
581
    if (NULL == pVg) {
580,196!
UNCOV
582
      continue;
×
583
    }
584
    int32_t vgNum = taosArrayGetSize(pVg);
580,196✔
585
    if (vgNum <= 0) {
580,196✔
586
      continue;
630✔
587
    }
588

589
    for (int32_t j = 0; j < vgNum; ++j) {
3,001,276✔
590
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
2,421,772✔
591
      if (NULL == pInfo) {
2,421,589!
UNCOV
592
        taosArrayDestroy(nodeList);
×
593
        return TSDB_CODE_OUT_OF_RANGE;
×
594
      }
595
      SQueryNodeLoad load = {0};
2,421,589✔
596
      load.addr.nodeId = pInfo->vgId;
2,421,589✔
597
      load.addr.epSet = pInfo->epSet;
2,421,589✔
598

599
      if (NULL == taosArrayPush(nodeList, &load)) {
2,421,710!
UNCOV
600
        taosArrayDestroy(nodeList);
×
601
        return terrno;
×
602
      }
603
    }
604
  }
605

606
  int32_t vnodeNum = taosArrayGetSize(nodeList);
603,908✔
607
  if (vnodeNum > 0) {
603,967✔
608
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
578,843✔
609
    goto _return;
578,837✔
610
  }
611

612
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
25,124✔
613
  if (mnodeNum <= 0) {
25,131!
UNCOV
614
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
615
    goto _return;
×
616
  }
617

618
  void* pData = taosArrayGet(pMnodeList, 0);
25,131✔
619
  if (NULL == pData) {
25,132!
UNCOV
620
    taosArrayDestroy(nodeList);
×
621
    return TSDB_CODE_OUT_OF_RANGE;
×
622
  }
623
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
25,132!
UNCOV
624
    taosArrayDestroy(nodeList);
×
625
    return terrno;
×
626
  }
627

628
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
25,132✔
629

630
_return:
21,579✔
631

632
  *pNodeList = nodeList;
603,965✔
633

634
  return TSDB_CODE_SUCCESS;
603,965✔
635
}
636

637
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
279,209✔
638
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
279,209✔
639
  if (NULL == nodeList) {
279,209!
UNCOV
640
    return terrno;
×
641
  }
642

643
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
279,209✔
644
  if (qNodeNum > 0) {
279,208✔
645
    void* pData = taosArrayGet(pQnodeList, 0);
278,886✔
646
    if (NULL == pData) {
278,886!
UNCOV
647
      taosArrayDestroy(nodeList);
×
648
      return TSDB_CODE_OUT_OF_RANGE;
×
649
    }
650
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
278,886!
UNCOV
651
      taosArrayDestroy(nodeList);
×
652
      return terrno;
×
653
    }
654
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
278,887✔
655
    goto _return;
278,887✔
656
  }
657

658
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
322✔
659
  if (mnodeNum <= 0) {
322✔
660
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
4!
661
    goto _return;
4✔
662
  }
663

664
  void* pData = taosArrayGet(pMnodeList, 0);
318✔
665
  if (NULL == pData) {
318!
UNCOV
666
    taosArrayDestroy(nodeList);
×
667
    return TSDB_CODE_OUT_OF_RANGE;
×
668
  }
669
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
318!
UNCOV
670
    taosArrayDestroy(nodeList);
×
671
    return terrno;
×
672
  }
673

674
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
318!
675

UNCOV
676
_return:
×
677

678
  *pNodeList = nodeList;
279,209✔
679

680
  return TSDB_CODE_SUCCESS;
279,209✔
681
}
682

683
void freeVgList(void* list) {
13,796✔
684
  SArray* pList = *(SArray**)list;
13,796✔
685
  taosArrayDestroy(pList);
13,796✔
686
}
13,806✔
687

688
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
848,974✔
689
  SArray* pDbVgList = NULL;
848,974✔
690
  SArray* pQnodeList = NULL;
848,974✔
691
  FDelete fp = NULL;
848,974✔
692
  int32_t code = 0;
848,974✔
693

694
  switch (tsQueryPolicy) {
848,974!
695
    case QUERY_POLICY_VNODE:
569,796✔
696
    case QUERY_POLICY_CLIENT: {
697
      if (pResultMeta) {
569,796!
698
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
569,820✔
699
        if (NULL == pDbVgList) {
569,819!
UNCOV
700
          code = terrno;
×
701
          goto _return;
×
702
        }
703
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
569,819✔
704
        for (int32_t i = 0; i < dbNum; ++i) {
1,136,157✔
705
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
566,365✔
706
          if (pRes->code || NULL == pRes->pRes) {
566,333!
UNCOV
707
            continue;
×
708
          }
709

710
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
1,132,746!
UNCOV
711
            code = terrno;
×
712
            goto _return;
×
713
          }
714
        }
715
      } else {
UNCOV
716
        fp = freeVgList;
×
717

UNCOV
718
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
719
        if (dbNum > 0) {
×
720
          SCatalog*     pCtg = NULL;
×
721
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
722
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
723
          if (code != TSDB_CODE_SUCCESS) {
×
724
            goto _return;
×
725
          }
726

UNCOV
727
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
728
          if (NULL == pDbVgList) {
×
729
            code = terrno;
×
730
            goto _return;
×
731
          }
UNCOV
732
          SArray* pVgList = NULL;
×
733
          for (int32_t i = 0; i < dbNum; ++i) {
×
734
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
735
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
736
                                     .requestId = pRequest->requestId,
×
737
                                     .requestObjRefId = pRequest->self,
×
738
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
739

740
            // catalogGetDBVgList will handle dbFName == null.
UNCOV
741
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
742
            if (code) {
×
743
              goto _return;
×
744
            }
745

UNCOV
746
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
747
              code = terrno;
×
748
              goto _return;
×
749
            }
750
          }
751
        }
752
      }
753

754
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
569,792✔
755
      break;
569,847✔
756
    }
757
    case QUERY_POLICY_HYBRID:
279,205✔
758
    case QUERY_POLICY_QNODE: {
759
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
284,773!
760
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
5,568✔
761
        if (pRes->code) {
5,568!
UNCOV
762
          pQnodeList = NULL;
×
763
        } else {
764
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
5,568✔
765
          if (NULL == pQnodeList) {
5,568!
UNCOV
766
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
767
            goto _return;
×
768
          }
769
        }
770
      } else {
771
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
273,640✔
772
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
273,640!
773
        if (pInst->pQnodeList) {
273,641!
774
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
273,641✔
775
          if (NULL == pQnodeList) {
273,641!
UNCOV
776
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
777
            goto _return;
×
778
          }
779
        }
780
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
273,641!
781
      }
782

783
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
279,209✔
784
      break;
279,209✔
785
    }
UNCOV
786
    default:
×
787
      tscError("unknown query policy: %d", tsQueryPolicy);
×
788
      return TSDB_CODE_APP_ERROR;
×
789
  }
790

791
_return:
849,056✔
792
  taosArrayDestroyEx(pDbVgList, fp);
849,056✔
793
  taosArrayDestroy(pQnodeList);
849,082✔
794

795
  return code;
849,087✔
796
}
797

798
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
34,093✔
799
  SArray* pDbVgList = NULL;
34,093✔
800
  SArray* pQnodeList = NULL;
34,093✔
801
  int32_t code = 0;
34,093✔
802

803
  switch (tsQueryPolicy) {
34,093!
804
    case QUERY_POLICY_VNODE:
34,099✔
805
    case QUERY_POLICY_CLIENT: {
806
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
34,099✔
807
      if (dbNum > 0) {
34,107✔
808
        SCatalog*     pCtg = NULL;
13,799✔
809
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
13,799✔
810
        code = catalogGetHandle(pInst->clusterId, &pCtg);
13,799✔
811
        if (code != TSDB_CODE_SUCCESS) {
13,800!
UNCOV
812
          goto _return;
×
813
        }
814

815
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
13,800✔
816
        if (NULL == pDbVgList) {
13,801✔
817
          code = terrno;
1✔
UNCOV
818
          goto _return;
×
819
        }
820
        SArray* pVgList = NULL;
13,800✔
821
        for (int32_t i = 0; i < dbNum; ++i) {
27,602✔
822
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
13,796✔
823
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
13,798✔
824
                                   .requestId = pRequest->requestId,
13,798✔
825
                                   .requestObjRefId = pRequest->self,
13,798✔
826
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
13,798✔
827

828
          // catalogGetDBVgList will handle dbFName == null.
829
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
13,808✔
830
          if (code) {
13,803!
UNCOV
831
            goto _return;
×
832
          }
833

834
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
13,802!
UNCOV
835
            code = terrno;
×
836
            goto _return;
×
837
          }
838
        }
839
      }
840

841
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
34,114✔
842
      break;
34,111✔
843
    }
UNCOV
844
    case QUERY_POLICY_HYBRID:
×
845
    case QUERY_POLICY_QNODE: {
UNCOV
846
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
847

UNCOV
848
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
849
      break;
×
850
    }
UNCOV
851
    default:
×
852
      tscError("unknown query policy: %d", tsQueryPolicy);
×
853
      return TSDB_CODE_APP_ERROR;
×
854
  }
855

856
_return:
34,111✔
857

858
  taosArrayDestroyEx(pDbVgList, freeVgList);
34,111✔
859
  taosArrayDestroy(pQnodeList);
34,108✔
860

861
  return code;
34,114✔
862
}
863

864
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
34,096✔
865
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
34,096✔
866

867
  SExecResult      res = {0};
34,096✔
868
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
34,096✔
869
                           .requestId = pRequest->requestId,
34,096✔
870
                           .requestObjRefId = pRequest->self};
34,096✔
871
  SSchedulerReq    req = {
68,195✔
872
         .syncReq = true,
873
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
34,096✔
874
         .pConn = &conn,
875
         .pNodeList = pNodeList,
876
         .pDag = pDag,
877
         .sql = pRequest->sqlstr,
34,096✔
878
         .startTs = pRequest->metric.start,
34,096✔
879
         .execFp = NULL,
880
         .cbParam = NULL,
881
         .chkKillFp = chkRequestKilled,
882
         .chkKillParam = (void*)pRequest->self,
34,096✔
883
         .pExecRes = &res,
884
         .source = pRequest->source,
34,096✔
885
         .pWorkerCb = getTaskPoolWorkerCb(),
34,096✔
886
  };
887

888
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
34,099✔
889

890
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
34,110✔
891
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
34,107✔
892

893
  if (code != TSDB_CODE_SUCCESS) {
34,107!
UNCOV
894
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
895

UNCOV
896
    pRequest->code = code;
×
897
    terrno = code;
×
898
    return pRequest->code;
5✔
899
  }
900

901
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
34,107!
902
      TDMT_VND_CREATE_TABLE == pRequest->type) {
166✔
903
    pRequest->body.resInfo.numOfRows = res.numOfRows;
34,077✔
904
    if (TDMT_VND_SUBMIT == pRequest->type) {
34,077✔
905
      STscObj*            pTscObj = pRequest->pTscObj;
33,953✔
906
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
33,953✔
907
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
33,953✔
908
    }
909

910
    schedulerFreeJob(&pRequest->body.queryJob, 0);
34,081✔
911
  }
912

913
  pRequest->code = res.code;
34,120✔
914
  terrno = res.code;
34,120✔
915
  return pRequest->code;
34,113✔
916
}
917

918
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
9,574,824✔
919
  SArray*      pArray = NULL;
9,574,824✔
920
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
9,574,824✔
921
  if (NULL == pRsp->aCreateTbRsp) {
9,574,824✔
922
    return TSDB_CODE_SUCCESS;
9,523,973✔
923
  }
924

925
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
50,851✔
926
  for (int32_t i = 0; i < tbNum; ++i) {
117,155✔
927
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
61,402✔
928
    if (pTbRsp->pMeta) {
61,400✔
929
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
40,221!
930
    }
931
  }
932

933
  return TSDB_CODE_SUCCESS;
55,753✔
934
}
935

936
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
662,813✔
937
  int32_t code = 0;
662,813✔
938
  SArray* pArray = NULL;
662,813✔
939
  SArray* pTbArray = (SArray*)res;
662,813✔
940
  int32_t tbNum = taosArrayGetSize(pTbArray);
662,813✔
941
  if (tbNum <= 0) {
662,816!
UNCOV
942
    return TSDB_CODE_SUCCESS;
×
943
  }
944

945
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
662,816✔
946
  if (NULL == pArray) {
662,839!
UNCOV
947
    return terrno;
×
948
  }
949

950
  for (int32_t i = 0; i < tbNum; ++i) {
1,840,512✔
951
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
1,177,669✔
952
    if (NULL == tbInfo) {
1,177,658!
UNCOV
953
      code = terrno;
×
954
      goto _return;
×
955
    }
956
    STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
1,177,658✔
957
    if (NULL == taosArrayPush(pArray, &tbSver)) {
1,177,673!
UNCOV
958
      code = terrno;
×
959
      goto _return;
×
960
    }
961
  }
962

963
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
662,843✔
964
                           .requestId = pRequest->requestId,
662,843✔
965
                           .requestObjRefId = pRequest->self,
662,843✔
966
                           .mgmtEps = *epset};
967

968
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
662,843✔
969

970
_return:
662,813✔
971

972
  taosArrayDestroy(pArray);
662,813✔
973
  return code;
662,834✔
974
}
975

976
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
3,736✔
977
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
3,736✔
978
}
979

980
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
142,701✔
981
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
142,701✔
982
}
983

984
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
10,483,798✔
985
  if (NULL == pRequest->body.resInfo.execRes.res) {
10,483,798✔
986
    return pRequest->code;
200,360✔
987
  }
988

989
  SCatalog*     pCatalog = NULL;
10,283,438✔
990
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
10,283,438✔
991

992
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
10,282,256✔
993
  if (code) {
10,299,434!
UNCOV
994
    return code;
×
995
  }
996

997
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
10,299,434✔
998
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
10,309,466✔
999

1000
  switch (pRes->msgType) {
10,309,466✔
1001
    case TDMT_VND_ALTER_TABLE:
619✔
1002
    case TDMT_MND_ALTER_STB: {
1003
      code = handleAlterTbExecRes(pRes->res, pCatalog);
619✔
1004
      break;
619✔
1005
    }
1006
    case TDMT_VND_CREATE_TABLE: {
62,348✔
1007
      SArray* pList = (SArray*)pRes->res;
62,348✔
1008
      int32_t num = taosArrayGetSize(pList);
62,348✔
1009
      for (int32_t i = 0; i < num; ++i) {
158,959✔
1010
        void* res = taosArrayGetP(pList, i);
96,613✔
1011
        // handleCreateTbExecRes will handle res == null
1012
        code = handleCreateTbExecRes(res, pCatalog);
96,608✔
1013
      }
1014
      break;
62,346✔
1015
    }
1016
    case TDMT_MND_CREATE_STB: {
334✔
1017
      code = handleCreateTbExecRes(pRes->res, pCatalog);
334✔
1018
      break;
334✔
1019
    }
1020
    case TDMT_VND_SUBMIT: {
9,581,564✔
1021
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
9,581,564✔
1022

1023
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
9,584,626✔
1024
      break;
9,576,402✔
1025
    }
1026
    case TDMT_SCH_QUERY:
662,834✔
1027
    case TDMT_SCH_MERGE_QUERY: {
1028
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
662,834✔
1029
      break;
662,815✔
1030
    }
1031
    default:
1,767✔
1032
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self, pRequest->type,
1,767!
1033
               pRequest->requestId);
UNCOV
1034
      code = TSDB_CODE_APP_ERROR;
×
1035
  }
1036

1037
  return code;
10,302,516✔
1038
}
1039

1040
static bool incompletaFileParsing(SNode* pStmt) {
10,476,652✔
1041
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
10,476,652!
1042
}
1043

1044
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
489✔
1045
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
489✔
1046

1047
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
489✔
1048
  if (TSDB_CODE_SUCCESS == code) {
489!
1049
    int64_t analyseStart = taosGetTimestampUs();
489✔
1050
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
489✔
1051
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
489✔
1052
  }
1053

1054
  if (TSDB_CODE_SUCCESS == code) {
489!
1055
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
489✔
1056
  }
1057

1058
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
489✔
1059
  handleQueryAnslyseRes(pWrapper, NULL, code);
489✔
1060
}
489✔
1061

1062
void returnToUser(SRequestObj* pRequest) {
57,545✔
1063
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
57,545!
1064
    // return to client
1065
    doRequestCallback(pRequest, pRequest->code);
57,544✔
1066
    return;
57,544✔
1067
  }
1068

1069
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
1✔
1070
  if (pUserReq) {
1!
1071
    pUserReq->code = pRequest->code;
1✔
1072
    // return to client
1073
    doRequestCallback(pUserReq, pUserReq->code);
1✔
1074
    (void)releaseRequest(pRequest->relation.userRefId);
1✔
1075
    return;
1✔
1076
  } else {
UNCOV
1077
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1078
             pRequest->relation.userRefId, pRequest->requestId);
1079
  }
1080
}
1081

1082
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
489✔
1083
  int64_t     lastTs = 0;
489✔
1084
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
489✔
1085
  int32_t     numOfFields = taos_num_fields(pRes);
489✔
1086

1087
  int32_t code = createDataBlock(pBlock);
489✔
1088
  if (code) {
489!
UNCOV
1089
    return code;
×
1090
  }
1091

1092
  for (int32_t i = 0; i < numOfFields; ++i) {
1,956✔
1093
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
1,467✔
1094
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
1,467✔
1095
    if (TSDB_CODE_SUCCESS != code) {
1,467!
UNCOV
1096
      blockDataDestroy(*pBlock);
×
1097
      return code;
×
1098
    }
1099
  }
1100

1101
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
489✔
1102
  if (TSDB_CODE_SUCCESS != code) {
489!
UNCOV
1103
    blockDataDestroy(*pBlock);
×
1104
    return code;
×
1105
  }
1106

1107
  for (int32_t i = 0; i < numOfRows; ++i) {
1,524✔
1108
    TAOS_ROW pRow = taos_fetch_row(pRes);
1,035✔
1109
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
1,035!
UNCOV
1110
      tscError("invalid data from vnode");
×
1111
      blockDataDestroy(*pBlock);
×
1112
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1113
    }
1114
    int64_t ts = *(int64_t*)pRow[0];
1,035✔
1115
    if (lastTs < ts) {
1,035✔
1116
      lastTs = ts;
583✔
1117
    }
1118

1119
    for (int32_t j = 0; j < numOfFields; ++j) {
4,140✔
1120
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
3,105✔
1121
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
3,105✔
1122
      if (TSDB_CODE_SUCCESS != code) {
3,105!
UNCOV
1123
        blockDataDestroy(*pBlock);
×
1124
        return code;
×
1125
      }
1126
    }
1127

1128
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
1,035!
1129
  }
1130

1131
  (*pBlock)->info.window.ekey = lastTs;
489✔
1132
  (*pBlock)->info.rows = numOfRows;
489✔
1133

1134
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
489!
1135
  return TSDB_CODE_SUCCESS;
489✔
1136
}
1137

1138
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
490✔
1139
  SRequestObj* pRequest = (SRequestObj*)res;
490✔
1140
  if (pRequest->code) {
490✔
1141
    returnToUser(pRequest);
1✔
1142
    return;
1✔
1143
  }
1144

1145
  SSDataBlock* pBlock = NULL;
489✔
1146
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
489✔
1147
  if (TSDB_CODE_SUCCESS != pRequest->code) {
489!
UNCOV
1148
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1149
             tstrerror(pRequest->code));
UNCOV
1150
    returnToUser(pRequest);
×
1151
    return;
×
1152
  }
1153

1154
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
489✔
1155
  if (pNextReq) {
489!
1156
    continuePostSubQuery(pNextReq, pBlock);
489✔
1157
    (void)releaseRequest(pRequest->relation.nextRefId);
489✔
1158
  } else {
UNCOV
1159
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1160
             pRequest->relation.nextRefId, pRequest->requestId);
1161
  }
1162

1163
  blockDataDestroy(pBlock);
489✔
1164
}
1165

1166
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
490✔
1167
  SRequestObj* pRequest = pWrapper->pRequest;
490✔
1168
  if (TD_RES_QUERY(pRequest)) {
490!
1169
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
490✔
1170
    return;
490✔
1171
  }
1172

UNCOV
1173
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1174
  if (pNextReq) {
×
1175
    continuePostSubQuery(pNextReq, NULL);
×
1176
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1177
  } else {
UNCOV
1178
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1179
             pRequest->relation.nextRefId, pRequest->requestId);
1180
  }
1181
}
1182

1183
// todo refacto the error code  mgmt
1184
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
10,449,445✔
1185
  SSqlCallbackWrapper* pWrapper = param;
10,449,445✔
1186
  SRequestObj*         pRequest = pWrapper->pRequest;
10,449,445✔
1187
  STscObj*             pTscObj = pRequest->pTscObj;
10,449,445✔
1188

1189
  pRequest->code = code;
10,449,445✔
1190
  if (pResult) {
10,449,445!
1191
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
10,454,605✔
1192
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
10,457,879✔
1193
  }
1194

1195
  int32_t type = pRequest->type;
10,452,719✔
1196
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
10,452,719✔
1197
    if (pResult) {
9,663,491!
1198
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
9,670,064✔
1199

1200
      // record the insert rows
1201
      if (TDMT_VND_SUBMIT == type) {
9,670,064✔
1202
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
9,538,132✔
1203
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
9,538,132✔
1204
      }
1205
    }
1206
    schedulerFreeJob(&pRequest->body.queryJob, 0);
9,683,867✔
1207
  }
1208

1209
  taosMemoryFree(pResult);
10,475,276!
1210
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
10,480,528✔
1211
           pRequest->requestId);
1212

1213
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
10,465,063!
1214
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64, pRequest->self,
269✔
1215
             tstrerror(code), pRequest->retry, pRequest->requestId);
1216
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
269!
UNCOV
1217
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1218
    }
1219
    restartAsyncQuery(pRequest, code);
269✔
1220
    return;
269✔
1221
  }
1222

1223
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
10,464,794!
1224
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
10,464,794!
1225
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
12,706!
UNCOV
1226
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1227
    }
1228
  }
1229

1230
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
10,459,778✔
1231
  int32_t code1 = handleQueryExecRsp(pRequest);
10,459,778✔
1232
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
10,470,148!
UNCOV
1233
    pRequest->code = code1;
×
1234
  }
1235

1236
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
20,947,530!
1237
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
10,475,233✔
UNCOV
1238
    continueInsertFromCsv(pWrapper, pRequest);
×
1239
    return;
×
1240
  }
1241

1242
  if (pRequest->relation.nextRefId) {
10,479,584✔
1243
    handlePostSubQuery(pWrapper);
490✔
1244
  } else {
1245
    destorySqlCallbackWrapper(pWrapper);
10,479,094✔
1246
    pRequest->pWrapper = NULL;
10,479,706✔
1247

1248
    // return to client
1249
    doRequestCallback(pRequest, code);
10,479,706✔
1250
  }
1251
}
1252

1253
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
34,653✔
1254
  int32_t code = 0;
34,653✔
1255
  int32_t subplanNum = 0;
34,653✔
1256

1257
  if (pQuery->pRoot) {
34,653✔
1258
    pRequest->stmtType = pQuery->pRoot->type;
34,117✔
1259
  }
1260

1261
  if (pQuery->pRoot && !pRequest->inRetry) {
34,653!
1262
    STscObj*            pTscObj = pRequest->pTscObj;
34,120✔
1263
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
34,120✔
1264
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
34,120✔
1265
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
34,119✔
1266
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
1!
1267
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
2✔
1268
    }
1269
  }
1270

1271
  pRequest->body.execMode = pQuery->execMode;
34,656✔
1272
  switch (pQuery->execMode) {
34,656!
UNCOV
1273
    case QUERY_EXEC_MODE_LOCAL:
×
1274
      if (!pRequest->validateOnly) {
×
1275
        if (NULL == pQuery->pRoot) {
×
1276
          terrno = TSDB_CODE_INVALID_PARA;
×
1277
          code = terrno;
×
1278
        } else {
UNCOV
1279
          code = execLocalCmd(pRequest, pQuery);
×
1280
        }
1281
      }
UNCOV
1282
      break;
×
1283
    case QUERY_EXEC_MODE_RPC:
542✔
1284
      if (!pRequest->validateOnly) {
542!
1285
        code = execDdlQuery(pRequest, pQuery);
542✔
1286
      }
1287
      break;
543✔
1288
    case QUERY_EXEC_MODE_SCHEDULE: {
34,114✔
1289
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
34,114✔
1290
      if (NULL == pMnodeList) {
34,108!
UNCOV
1291
        code = terrno;
×
1292
        break;
×
1293
      }
1294
      SQueryPlan* pDag = NULL;
34,108✔
1295
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
34,108✔
1296
      if (TSDB_CODE_SUCCESS == code) {
34,099!
1297
        pRequest->body.subplanNum = pDag->numOfSubplans;
34,109✔
1298
        if (!pRequest->validateOnly) {
34,109!
1299
          SArray* pNodeList = NULL;
34,109✔
1300
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
34,109✔
1301
          if (TSDB_CODE_SUCCESS == code) {
34,112!
1302
            code = scheduleQuery(pRequest, pDag, pNodeList);
34,115✔
1303
          }
1304
          taosArrayDestroy(pNodeList);
34,102✔
1305
        }
1306
      }
1307
      taosArrayDestroy(pMnodeList);
34,110✔
1308
      break;
34,125✔
1309
    }
UNCOV
1310
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
UNCOV
1311
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
UNCOV
1312
      break;
×
UNCOV
1313
    default:
×
1314
      break;
×
1315
  }
1316

1317
  if (!keepQuery) {
34,668!
UNCOV
1318
    qDestroyQuery(pQuery);
×
1319
  }
1320

1321
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
34,668!
1322
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
212!
1323
    if (TSDB_CODE_SUCCESS != ret) {
212!
UNCOV
1324
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret, pRequest->requestId);
×
1325
    }
1326
  }
1327

1328
  if (TSDB_CODE_SUCCESS == code) {
34,668✔
1329
    code = handleQueryExecRsp(pRequest);
34,661✔
1330
  }
1331

1332
  if (TSDB_CODE_SUCCESS != code) {
34,672✔
1333
    pRequest->code = code;
130✔
1334
  }
1335

1336
  if (res) {
34,672!
UNCOV
1337
    *res = pRequest->body.resInfo.execRes.res;
×
1338
    pRequest->body.resInfo.execRes.res = NULL;
×
1339
  }
1340
}
34,672✔
1341

1342
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
10,424,381✔
1343
                                 SSqlCallbackWrapper* pWrapper) {
1344
  int32_t code = TSDB_CODE_SUCCESS;
10,424,381✔
1345
  pRequest->type = pQuery->msgType;
10,424,381✔
1346
  SArray*     pMnodeList = NULL;
10,424,381✔
1347
  SQueryPlan* pDag = NULL;
10,424,381✔
1348
  int64_t     st = taosGetTimestampUs();
10,429,013✔
1349

1350
  if (!pRequest->parseOnly) {
10,429,013!
1351
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
10,434,594✔
1352
    if (NULL == pMnodeList) {
10,456,890!
UNCOV
1353
      code = terrno;
×
1354
    }
1355
    SPlanContext cxt = {.queryId = pRequest->requestId,
20,896,253✔
1356
                        .acctId = pRequest->pTscObj->acctId,
10,456,890✔
1357
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
10,456,890✔
1358
                        .pAstRoot = pQuery->pRoot,
10,439,363✔
1359
                        .showRewrite = pQuery->showRewrite,
10,439,363✔
1360
                        .isView = pWrapper->pParseCtx->isView,
10,439,363✔
1361
                        .isAudit = pWrapper->pParseCtx->isAudit,
10,439,363✔
1362
                        .pMsg = pRequest->msgBuf,
10,439,363✔
1363
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1364
                        .pUser = pRequest->pTscObj->user,
10,439,363✔
1365
                        .sysInfo = pRequest->pTscObj->sysInfo,
10,439,363✔
1366
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
10,439,363✔
1367
                        .allocatorId = pRequest->allocatorRefId};
10,439,363✔
1368
    if (TSDB_CODE_SUCCESS == code) {
10,439,363!
1369
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
10,452,101✔
1370
    }
1371
    if (code) {
10,431,432✔
1372
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
640!
1373
               pRequest->requestId);
1374
    } else {
1375
      pRequest->body.subplanNum = pDag->numOfSubplans;
10,430,792✔
1376
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
10,430,792✔
1377
    }
1378
  }
1379

1380
  pRequest->metric.execStart = taosGetTimestampUs();
10,413,101✔
1381
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
10,413,101✔
1382

1383
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
20,864,629!
1384
    SArray* pNodeList = NULL;
10,434,891✔
1385
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
10,434,891✔
1386
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
849,062✔
1387
    }
1388

1389
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
10,434,913✔
1390
                             .requestId = pRequest->requestId,
10,404,815✔
1391
                             .requestObjRefId = pRequest->self};
10,404,815✔
1392
    SSchedulerReq    req = {
20,804,834✔
1393
           .syncReq = false,
1394
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
10,404,815✔
1395
           .pConn = &conn,
1396
           .pNodeList = pNodeList,
1397
           .pDag = pDag,
1398
           .allocatorRefId = pRequest->allocatorRefId,
10,404,815✔
1399
           .sql = pRequest->sqlstr,
10,404,815✔
1400
           .startTs = pRequest->metric.start,
10,404,815✔
1401
           .execFp = schedulerExecCb,
1402
           .cbParam = pWrapper,
1403
           .chkKillFp = chkRequestKilled,
1404
           .chkKillParam = (void*)pRequest->self,
10,404,815✔
1405
           .pExecRes = NULL,
1406
           .source = pRequest->source,
10,404,815✔
1407
           .pWorkerCb = getTaskPoolWorkerCb(),
10,404,815✔
1408
    };
1409
    if (TSDB_CODE_SUCCESS == code) {
10,400,019!
1410
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
10,402,103✔
1411
    }
1412

1413
    taosArrayDestroy(pNodeList);
10,455,725✔
1414
  } else {
UNCOV
1415
    qDestroyQueryPlan(pDag);
×
1416
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
860✔
1417
             pRequest->requestId);
1418
    destorySqlCallbackWrapper(pWrapper);
860✔
1419
    pRequest->pWrapper = NULL;
860✔
1420
    if (TSDB_CODE_SUCCESS != code) {
860✔
1421
      pRequest->code = terrno;
640✔
1422
    }
1423

1424
    doRequestCallback(pRequest, code);
860✔
1425
  }
1426

1427
  // todo not to be released here
1428
  taosArrayDestroy(pMnodeList);
10,452,388✔
1429

1430
  return code;
10,463,761✔
1431
}
1432

1433
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
10,566,791✔
1434
  int32_t code = 0;
10,566,791✔
1435

1436
  if (pRequest->parseOnly) {
10,566,791✔
1437
    doRequestCallback(pRequest, 0);
608✔
1438
    return;
608✔
1439
  }
1440

1441
  pRequest->body.execMode = pQuery->execMode;
10,566,183✔
1442
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
10,566,183✔
1443
    destorySqlCallbackWrapper(pWrapper);
155,667✔
1444
    pRequest->pWrapper = NULL;
155,667✔
1445
  }
1446

1447
  if (pQuery->pRoot && !pRequest->inRetry) {
10,566,183!
1448
    STscObj*            pTscObj = pRequest->pTscObj;
10,574,383✔
1449
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
10,574,383✔
1450
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
10,574,383✔
1451
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
9,588,332✔
1452
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
9,518,305✔
1453
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
1,056,078✔
1454
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
736,378✔
1455
    }
1456
  }
1457

1458
  switch (pQuery->execMode) {
10,631,306!
1459
    case QUERY_EXEC_MODE_LOCAL:
118,930✔
1460
      asyncExecLocalCmd(pRequest, pQuery);
118,930✔
1461
      break;
118,930✔
1462
    case QUERY_EXEC_MODE_RPC:
33,487✔
1463
      code = asyncExecDdlQuery(pRequest, pQuery);
33,487✔
1464
      break;
33,487✔
1465
    case QUERY_EXEC_MODE_SCHEDULE: {
10,475,639✔
1466
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
10,475,639✔
1467
      break;
10,451,556✔
1468
    }
1469
    case QUERY_EXEC_MODE_EMPTY_RESULT:
3,250✔
1470
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
3,250✔
1471
      doRequestCallback(pRequest, 0);
3,250✔
1472
      break;
3,250✔
UNCOV
1473
    default:
×
1474
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1475
      doRequestCallback(pRequest, -1);
×
1476
      break;
×
1477
  }
1478
}
1479

1480
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
15✔
1481
  SCatalog* pCatalog = NULL;
15✔
1482
  int32_t   code = 0;
15✔
1483
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
15✔
1484
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
15✔
1485

1486
  if (dbNum <= 0 && tblNum <= 0) {
15!
1487
    return TSDB_CODE_APP_ERROR;
15✔
1488
  }
1489

UNCOV
1490
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
1491
  if (code != TSDB_CODE_SUCCESS) {
×
1492
    return code;
×
1493
  }
1494

UNCOV
1495
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1496
                           .requestId = pRequest->requestId,
×
1497
                           .requestObjRefId = pRequest->self,
×
1498
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1499

UNCOV
1500
  for (int32_t i = 0; i < dbNum; ++i) {
×
1501
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1502

1503
    // catalogRefreshDBVgInfo will handle dbFName == null.
UNCOV
1504
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
1505
    if (code != TSDB_CODE_SUCCESS) {
×
1506
      return code;
×
1507
    }
1508
  }
1509

UNCOV
1510
  for (int32_t i = 0; i < tblNum; ++i) {
×
1511
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1512

1513
    // catalogRefreshTableMeta will handle tableName == null.
UNCOV
1514
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
1515
    if (code != TSDB_CODE_SUCCESS) {
×
1516
      return code;
×
1517
    }
1518
  }
1519

UNCOV
1520
  return code;
×
1521
}
1522

1523
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
15,012✔
1524
  SCatalog* pCatalog = NULL;
15,012✔
1525
  int32_t   tbNum = taosArrayGetSize(tbList);
15,012✔
1526
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
15,012✔
1527
  if (code != TSDB_CODE_SUCCESS) {
15,012!
UNCOV
1528
    return code;
×
1529
  }
1530

1531
  if (isView) {
15,012✔
1532
    for (int32_t i = 0; i < tbNum; ++i) {
828✔
1533
      SName* pViewName = taosArrayGet(tbList, i);
414✔
1534
      char   dbFName[TSDB_DB_FNAME_LEN];
1535
      if (NULL == pViewName) {
414!
UNCOV
1536
        continue;
×
1537
      }
1538
      (void)tNameGetFullDbName(pViewName, dbFName);
414✔
1539
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
414!
1540
    }
1541
  } else {
1542
    for (int32_t i = 0; i < tbNum; ++i) {
19,041✔
1543
      SName* pTbName = taosArrayGet(tbList, i);
4,443✔
1544
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
4,443!
1545
    }
1546
  }
1547

1548
  return TSDB_CODE_SUCCESS;
15,012✔
1549
}
1550

1551
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
27,941✔
1552
  pEpSet->version = 0;
27,941✔
1553

1554
  // init mnode ip set
1555
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
27,941✔
1556
  mgmtEpSet->numOfEps = 0;
27,941✔
1557
  mgmtEpSet->inUse = 0;
27,941✔
1558

1559
  if (firstEp && firstEp[0] != 0) {
27,941!
1560
    if (strlen(firstEp) >= TSDB_EP_LEN) {
27,945!
UNCOV
1561
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1562
      return -1;
×
1563
    }
1564

1565
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
27,945✔
1566
    if (code != TSDB_CODE_SUCCESS) {
27,945!
UNCOV
1567
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1568
      return terrno;
×
1569
    }
1570
    uint32_t addr = 0;
27,945✔
1571
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
27,945✔
1572
    if (code) {
27,949✔
1573
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
4!
1574
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1575
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
4✔
1576
    } else {
1577
      mgmtEpSet->numOfEps++;
27,945✔
1578
    }
1579
  }
1580

1581
  if (secondEp && secondEp[0] != 0) {
27,945!
1582
    if (strlen(secondEp) >= TSDB_EP_LEN) {
15,948!
UNCOV
1583
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1584
      return terrno;
×
1585
    }
1586

1587
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
15,948✔
1588
    if (code != TSDB_CODE_SUCCESS) {
15,948!
UNCOV
1589
      return code;
×
1590
    }
1591
    uint32_t addr = 0;
15,948✔
1592
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
15,948✔
1593
    if (code) {
15,948!
UNCOV
1594
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1595
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
UNCOV
1596
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1597
    } else {
1598
      mgmtEpSet->numOfEps++;
15,948✔
1599
    }
1600
  }
1601

1602
  if (mgmtEpSet->numOfEps == 0) {
27,945✔
1603
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1604
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1605
  }
1606

1607
  return 0;
27,941✔
1608
}
1609

1610
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
27,945✔
1611
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1612
  *pTscObj = NULL;
27,945✔
1613
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
27,945✔
1614
  if (TSDB_CODE_SUCCESS != code) {
27,945!
UNCOV
1615
    return code;
×
1616
  }
1617

1618
  SRequestObj* pRequest = NULL;
27,945✔
1619
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
27,945✔
1620
  if (TSDB_CODE_SUCCESS != code) {
27,945!
UNCOV
1621
    destroyTscObj(*pTscObj);
×
1622
    return code;
×
1623
  }
1624

1625
  pRequest->sqlstr = taosStrdup("taos_connect");
27,945!
1626
  if (pRequest->sqlstr) {
27,945!
1627
    pRequest->sqlLen = strlen(pRequest->sqlstr);
27,945✔
1628
  } else {
UNCOV
1629
    return terrno;
×
1630
  }
1631

1632
  SMsgSendInfo* body = NULL;
27,945✔
1633
  code = buildConnectMsg(pRequest, &body);
27,945✔
1634
  if (TSDB_CODE_SUCCESS != code) {
27,945!
UNCOV
1635
    destroyTscObj(*pTscObj);
×
1636
    return code;
×
1637
  }
1638

1639
  // int64_t transporterId = 0;
1640
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
27,945✔
1641
  if (TSDB_CODE_SUCCESS != code) {
27,945!
UNCOV
1642
    destroyTscObj(*pTscObj);
×
1643
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1644
    return code;
×
1645
  }
1646
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
27,945!
UNCOV
1647
    destroyTscObj(*pTscObj);
×
1648
    tscError("failed to wait sem, code:%s", terrstr());
×
1649
    return terrno;
×
1650
  }
1651
  if (pRequest->code != TSDB_CODE_SUCCESS) {
27,942✔
1652
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
39!
1653
    tscError("failed to connect to server, reason: %s", errorMsg);
39!
1654

1655
    terrno = pRequest->code;
39✔
1656
    destroyRequest(pRequest);
39✔
1657
    taos_close_internal(*pTscObj);
39✔
1658
    *pTscObj = NULL;
39✔
1659
    return terrno;
39✔
1660
  } else {
1661
    tscInfo("connObj:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
27,903✔
1662
             (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1663
    destroyRequest(pRequest);
27,907✔
1664
  }
1665
  return code;
27,906✔
1666
}
1667

1668
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
27,945✔
1669
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
27,945!
1670
  if (*pMsgSendInfo == NULL) {
27,945!
UNCOV
1671
    return terrno;
×
1672
  }
1673

1674
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
27,945✔
1675

1676
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
27,945✔
1677
  (*pMsgSendInfo)->requestId = pRequest->requestId;
27,945✔
1678
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
27,945✔
1679
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
27,945!
1680
  if (NULL == (*pMsgSendInfo)->param) {
27,944!
UNCOV
1681
    taosMemoryFree(*pMsgSendInfo);
×
1682
    return terrno;
×
1683
  }
1684

1685
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
27,944✔
1686

1687
  SConnectReq connectReq = {0};
27,944✔
1688
  STscObj*    pObj = pRequest->pTscObj;
27,944✔
1689

1690
  char* db = getDbOfConnection(pObj);
27,944✔
1691
  if (db != NULL) {
27,945✔
1692
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
796✔
1693
  } else if (terrno) {
27,149!
UNCOV
1694
    taosMemoryFree(*pMsgSendInfo);
×
1695
    return terrno;
×
1696
  }
1697
  taosMemoryFreeClear(db);
27,945!
1698

1699
  connectReq.connType = pObj->connType;
27,945✔
1700
  connectReq.pid = appInfo.pid;
27,945✔
1701
  connectReq.startTime = appInfo.startTime;
27,945✔
1702

1703
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
27,945✔
1704
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
27,945✔
1705
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
27,945✔
1706
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
27,945✔
1707

1708
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
27,945✔
1709
  void*   pReq = taosMemoryMalloc(contLen);
27,944!
1710
  if (NULL == pReq) {
27,945!
UNCOV
1711
    taosMemoryFree(*pMsgSendInfo);
×
1712
    return terrno;
×
1713
  }
1714

1715
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
27,945!
UNCOV
1716
    taosMemoryFree(*pMsgSendInfo);
×
1717
    taosMemoryFree(pReq);
×
1718
    return terrno;
×
1719
  }
1720

1721
  (*pMsgSendInfo)->msgInfo.len = contLen;
27,945✔
1722
  (*pMsgSendInfo)->msgInfo.pData = pReq;
27,945✔
1723
  return TSDB_CODE_SUCCESS;
27,945✔
1724
}
1725

1726
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
18,046,531✔
1727
  if (NULL == pEpSet) {
18,046,531✔
1728
    return;
16,741,542✔
1729
  }
1730

1731
  switch (pSendInfo->target.type) {
1,304,989✔
1732
    case TARGET_TYPE_MNODE:
60✔
1733
      if (NULL == pTscObj) {
60!
UNCOV
1734
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1735
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
UNCOV
1736
        return;
×
1737
      }
1738

1739
      SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
60✔
1740
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
60✔
1741
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
60✔
1742
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
60✔
1743
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1744
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
60✔
1745
      break;
60✔
1746
    case TARGET_TYPE_VNODE: {
749,777✔
1747
      if (NULL == pTscObj) {
749,777✔
1748
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
3!
1749
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1750
        return;
3✔
1751
      }
1752

1753
      SCatalog* pCatalog = NULL;
749,774✔
1754
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
749,774✔
1755
      if (code != TSDB_CODE_SUCCESS) {
749,776!
UNCOV
1756
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1757
                 tstrerror(code));
UNCOV
1758
        return;
×
1759
      }
1760

1761
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
749,776✔
1762
      if (code != TSDB_CODE_SUCCESS) {
749,774!
UNCOV
1763
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1764
                 tstrerror(code));
UNCOV
1765
        return;
×
1766
      }
1767
      taosMemoryFreeClear(pSendInfo->target.dbFName);
749,774!
1768
      break;
749,776✔
1769
    }
1770
    default:
555,152✔
1771
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
555,152!
1772
      break;
556,074✔
1773
  }
1774
}
1775

1776
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
18,051,006✔
1777
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
18,051,006✔
1778
  if (pMsg->info.ahandle == NULL) {
18,051,006✔
1779
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
152!
1780
    rpcFreeCont(pMsg->pCont);
152✔
1781
    taosMemoryFree(pEpSet);
152!
1782
    return TSDB_CODE_TSC_INTERNAL_ERROR;
152✔
1783
  }
1784

1785
  STscObj* pTscObj = NULL;
18,050,854✔
1786

1787
  STraceId* trace = &pMsg->info.traceId;
18,050,854✔
1788
  char      tbuf[40] = {0};
18,050,854✔
1789
  TRACE_TO_STR(trace, tbuf);
18,050,854!
1790

1791
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
18,049,620!
1792
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1793

1794
  if (pSendInfo->requestObjRefId != 0) {
18,049,914✔
1795
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
12,414,870✔
1796
    if (pRequest) {
12,413,580✔
1797
      if (pRequest->self != pSendInfo->requestObjRefId) {
12,411,853!
UNCOV
1798
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64,
×
1799
                 pRequest->self, pSendInfo->requestObjRefId);
1800

UNCOV
1801
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1802
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1803
        }
UNCOV
1804
        rpcFreeCont(pMsg->pCont);
×
1805
        taosMemoryFree(pEpSet);
×
1806
        destroySendMsgInfo(pSendInfo);
×
1807
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1808
      }
1809
      pTscObj = pRequest->pTscObj;
12,411,853✔
1810
    }
1811
  }
1812

1813
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
18,048,624✔
1814

1815
  SDataBuf buf = {.msgType = pMsg->msgType,
18,046,409✔
1816
                  .len = pMsg->contLen,
18,046,409✔
1817
                  .pData = NULL,
1818
                  .handle = pMsg->info.handle,
18,046,409✔
1819
                  .handleRefId = pMsg->info.refId,
18,046,409✔
1820
                  .pEpSet = pEpSet};
1821

1822
  if (pMsg->contLen > 0) {
18,046,409✔
1823
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
16,973,875!
1824
    if (buf.pData == NULL) {
16,972,683!
UNCOV
1825
      pMsg->code = terrno;
×
1826
    } else {
1827
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
16,972,683✔
1828
    }
1829
  }
1830

1831
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
18,045,217✔
1832

1833
  if (pTscObj) {
18,030,647✔
1834
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
12,396,480✔
1835
    if (TSDB_CODE_SUCCESS != code) {
12,406,876!
UNCOV
1836
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1837
      terrno = code;
×
1838
      pMsg->code = code;
×
1839
    }
1840
  }
1841

1842
  rpcFreeCont(pMsg->pCont);
18,041,043✔
1843
  destroySendMsgInfo(pSendInfo);
18,047,146✔
1844
  return TSDB_CODE_SUCCESS;
18,048,659✔
1845
}
1846
int32_t doProcessMsgFromServer(void* param) {
18,051,919✔
1847
  AsyncArg* arg = (AsyncArg*)param;
18,051,919✔
1848
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
18,051,919✔
1849
  taosMemoryFree(arg);
18,044,774!
1850
  return code;
18,049,655✔
1851
}
1852

1853
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
18,025,891✔
1854
  int32_t code = 0;
18,025,891✔
1855
  SEpSet* tEpSet = NULL;
18,025,891✔
1856
  if (pEpSet != NULL) {
18,025,891✔
1857
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
1,305,909!
1858
    if (NULL == tEpSet) {
1,305,904!
UNCOV
1859
      code = terrno;
×
1860
      pMsg->code = terrno;
×
1861
      goto _exit;
×
1862
    }
1863
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
1,305,904✔
1864
  }
1865

1866
  // pMsg is response msg
1867
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
18,025,886✔
1868
    // restore origin code
1869
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
27,945!
UNCOV
1870
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1871
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
27,945!
UNCOV
1872
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1873
    }
1874
  } else {
1875
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1876
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
17,997,941!
UNCOV
1877
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1878
    }
1879
  }
1880

1881
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
18,025,886!
1882
  if (NULL == arg) {
18,025,165!
UNCOV
1883
    code = terrno;
×
1884
    pMsg->code = code;
×
1885
    goto _exit;
×
1886
  }
1887

1888
  arg->msg = *pMsg;
18,025,165✔
1889
  arg->pEpset = tEpSet;
18,025,165✔
1890

1891
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
18,025,165✔
1892
    pMsg->code = code;
1,977✔
1893
    taosMemoryFree(arg);
1,977!
UNCOV
1894
    goto _exit;
×
1895
  }
1896
  return;
18,042,926✔
UNCOV
1897
_exit:
×
1898
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
1899
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
1900
  if (code != 0) {
×
1901
    tscError("failed to sched msg to tsc, tsc ready quit");
×
1902
  }
1903
}
1904

1905
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
4✔
1906
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
4!
1907
  if (user == NULL) {
4!
UNCOV
1908
    user = TSDB_DEFAULT_USER;
×
1909
  }
1910

1911
  if (auth == NULL) {
4!
UNCOV
1912
    tscError("No auth info is given, failed to connect to server");
×
1913
    return NULL;
×
1914
  }
1915

1916
  STscObj* pObj = NULL;
4✔
1917
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
4✔
1918
  if (TSDB_CODE_SUCCESS == code) {
4✔
1919
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1!
1920
    if (NULL == rid) {
1!
UNCOV
1921
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
1922
    }
1923
    *rid = pObj->id;
1✔
1924
    return (TAOS*)rid;
1✔
1925
  }
1926

1927
  return NULL;
3✔
1928
}
1929

1930
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
1931
//                      const char* db, int dbLen, uint16_t port) {
1932
//   char ipStr[TSDB_EP_LEN] = {0};
1933
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
1934
//   char userStr[TSDB_USER_LEN] = {0};
1935
//   char passStr[TSDB_PASSWORD_LEN] = {0};
1936
//
1937
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
1938
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
1939
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
1940
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1941
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
1942
// }
1943

1944
void doSetOneRowPtr(SReqResultInfo* pResultInfo, bool isStmt) {
31,447,350✔
1945
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
156,781,906✔
1946
    SResultColumn* pCol = &pResultInfo->pCol[i];
125,349,899✔
1947

1948
    int32_t type = pResultInfo->fields[i].type;
125,349,899✔
1949
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->fields[i].bytes, isStmt);
125,349,899✔
1950

1951
    if (IS_VAR_DATA_TYPE(type)) {
125,334,556✔
1952
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
24,682,644!
1953
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
23,728,965✔
1954

1955
        pResultInfo->length[i] = varDataLen(pStart);
23,728,965✔
1956
        pResultInfo->row[i] = varDataVal(pStart);
23,728,965✔
1957
      } else {
1958
        pResultInfo->row[i] = NULL;
953,679✔
1959
        pResultInfo->length[i] = 0;
953,679✔
1960
      }
1961
    } else {
1962
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
100,651,912✔
1963
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
97,001,757✔
1964
        pResultInfo->length[i] = schemaBytes;
97,001,757✔
1965
      } else {
1966
        pResultInfo->row[i] = NULL;
3,650,155✔
1967
        pResultInfo->length[i] = 0;
3,650,155✔
1968
      }
1969
    }
1970
  }
1971
}
31,432,007✔
1972

UNCOV
1973
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
1974
  if (pRequest == NULL) {
×
1975
    return NULL;
×
1976
  }
1977

UNCOV
1978
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
1979
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
1980
    // All data has returned to App already, no need to try again
UNCOV
1981
    if (pResultInfo->completed) {
×
1982
      pResultInfo->numOfRows = 0;
×
1983
      return NULL;
×
1984
    }
1985

UNCOV
1986
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
1987
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
1988

UNCOV
1989
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
1990
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
1991
      pResultInfo->numOfRows = 0;
×
1992
      return NULL;
×
1993
    }
1994

UNCOV
1995
    pRequest->code =
×
1996
        setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4, pRequest->isStmtBind);
×
1997
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
1998
      pResultInfo->numOfRows = 0;
×
1999
      return NULL;
×
2000
    }
2001

UNCOV
2002
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
×
2003
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2004

UNCOV
2005
    STscObj*            pTscObj = pRequest->pTscObj;
×
2006
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2007
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2008

UNCOV
2009
    if (pResultInfo->numOfRows == 0) {
×
2010
      return NULL;
×
2011
    }
2012
  }
2013

UNCOV
2014
  if (setupOneRowPtr) {
×
2015
    doSetOneRowPtr(pResultInfo, pRequest->isStmtBind);
×
2016
    pResultInfo->current += 1;
×
2017
  }
2018

UNCOV
2019
  return pResultInfo->row;
×
2020
}
2021

2022
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
772,852✔
2023
  tsem_t* sem = param;
772,852✔
2024
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
772,852!
UNCOV
2025
    tscError("failed to post sem, code:%s", terrstr());
×
2026
  }
2027
}
772,859✔
2028

2029
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
17,492,183✔
2030
  if (pRequest == NULL) {
17,492,183!
UNCOV
2031
    return NULL;
×
2032
  }
2033

2034
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
17,492,183✔
2035
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
17,492,183✔
2036
    // All data has returned to App already, no need to try again
2037
    if (pResultInfo->completed) {
1,348,239✔
2038
      pResultInfo->numOfRows = 0;
575,384✔
2039
      return NULL;
575,384✔
2040
    }
2041

2042
    // convert ucs4 to native multi-bytes string
2043
    pResultInfo->convertUcs4 = convertUcs4;
772,855✔
2044
    tsem_t sem;
2045
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
772,855!
UNCOV
2046
      tscError("failed to init sem, code:%s", terrstr());
×
2047
    }
2048
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
772,842✔
2049
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
772,863!
UNCOV
2050
      tscError("failed to wait sem, code:%s", terrstr());
×
2051
    }
2052
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
772,848!
UNCOV
2053
      tscError("failed to destroy sem, code:%s", terrstr());
×
2054
    }
2055
    pRequest->inCallback = false;
772,841✔
2056
  }
2057

2058
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
16,916,785!
2059
    return NULL;
58,017✔
2060
  } else {
2061
    if (setupOneRowPtr) {
16,858,768✔
2062
      doSetOneRowPtr(pResultInfo, pRequest->isStmtBind);
16,209,213✔
2063
      pResultInfo->current += 1;
16,207,740✔
2064
    }
2065

2066
    return pResultInfo->row;
16,857,295✔
2067
  }
2068
}
2069

2070
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
1,225,238✔
2071
  if (pResInfo->row == NULL) {
1,225,238✔
2072
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,104,468!
2073
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
1,104,530!
2074
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
1,104,527!
2075
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,104,530!
2076

2077
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
1,104,524!
UNCOV
2078
      taosMemoryFree(pResInfo->row);
×
UNCOV
2079
      taosMemoryFree(pResInfo->pCol);
×
2080
      taosMemoryFree(pResInfo->length);
×
2081
      taosMemoryFree(pResInfo->convertBuf);
×
2082
      return terrno;
×
2083
    }
2084
  }
2085

2086
  return TSDB_CODE_SUCCESS;
1,225,300✔
2087
}
2088

2089
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
1,223,951✔
2090
  int32_t idx = -1;
1,223,951✔
2091
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
1,223,951✔
2092
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
1,223,989!
2093

2094
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
5,125,424✔
2095
    int32_t type = pResultInfo->fields[i].type;
3,901,438✔
2096
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
3,901,438✔
2097

2098
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
3,901,426✔
2099
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
172,495!
2100
      if (p == NULL) {
172,495!
UNCOV
2101
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2102
        return terrno;
×
2103
      }
2104

2105
      pResultInfo->convertBuf[i] = p;
172,495✔
2106

2107
      SResultColumn* pCol = &pResultInfo->pCol[i];
172,495✔
2108
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
25,963,901✔
2109
        if (pCol->offset[j] != -1) {
25,791,397✔
2110
          char* pStart = pCol->offset[j] + pCol->pData;
25,484,873✔
2111

2112
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
25,484,873✔
2113
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
25,484,895✔
2114
            tscError(
13!
2115
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2116
                "colLength[i]):%p",
2117
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2118
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
13✔
UNCOV
2119
            return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2120
          }
2121

2122
          varDataSetLen(p, len);
25,484,882✔
2123
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
25,484,882✔
2124
          p += (len + VARSTR_HEADER_SIZE);
25,484,882✔
2125
        }
2126
      }
2127

2128
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
172,504✔
2129
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
172,504✔
2130
    }
2131
  }
2132
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
1,223,986✔
2133
  return TSDB_CODE_SUCCESS;
1,223,995✔
2134
}
2135

2136
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
1,223,990✔
2137
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
5,125,598✔
2138
    TAOS_FIELD_E* pField = pResultInfo->fields + i;
3,901,608✔
2139
    int32_t type = pField->type;
3,901,608✔
2140
    int32_t bufLen = 0;
3,901,608✔
2141
    char* p = NULL;
3,901,608✔
2142
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
3,901,608!
2143
      continue;
3,895,094✔
2144
    } else {
2145
      bufLen = 64;
6,514✔
2146
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
6,514!
2147
      pField->bytes = bufLen;
6,514✔
2148
    }
2149
    if (!p) return terrno;
6,514!
2150
    pResultInfo->convertBuf[i] = p;
6,514✔
2151

2152
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
4,355,887✔
2153
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
4,349,373✔
2154
                                  pField->precision, pField->scale, p, bufLen);
4,349,373✔
2155
      p += bufLen;
4,349,373✔
2156
      if (TSDB_CODE_SUCCESS != code) {
4,349,373!
UNCOV
2157
        return code;
×
2158
      }
2159
    }
2160
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
6,514✔
2161
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
6,514✔
2162
  }
2163
  return 0;
1,223,990✔
2164
}
2165

2166
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1,244✔
2167
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
1,244✔
2168
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
2169
}
2170

2171
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
622✔
2172
  char*   p = (char*)pResultInfo->pData;
622✔
2173
  int32_t blockVersion = *(int32_t*)p;
622✔
2174

2175
  int32_t numOfRows = pResultInfo->numOfRows;
622✔
2176
  int32_t numOfCols = pResultInfo->numOfCols;
622✔
2177

2178
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2179
  // length |
2180
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
622✔
2181
  if (numOfCols != cols) {
622!
UNCOV
2182
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2183
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2184
  }
2185

2186
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
622✔
2187
  int32_t* colLength = (int32_t*)(p + len);
622✔
2188
  len += sizeof(int32_t) * numOfCols;
622✔
2189

2190
  char* pStart = p + len;
622✔
2191
  for (int32_t i = 0; i < numOfCols; ++i) {
3,141✔
2192
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,519!
2193

2194
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,519✔
2195
      int32_t* offset = (int32_t*)pStart;
705✔
2196
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
705✔
2197
      len += lenTmp;
705✔
2198
      pStart += lenTmp;
705✔
2199

2200
      int32_t estimateColLen = 0;
705✔
2201
      for (int32_t j = 0; j < numOfRows; ++j) {
3,783✔
2202
        if (offset[j] == -1) {
3,078✔
2203
          continue;
233✔
2204
        }
2205
        char* data = offset[j] + pStart;
2,845✔
2206

2207
        int32_t jsonInnerType = *data;
2,845✔
2208
        char*   jsonInnerData = data + CHAR_BYTES;
2,845✔
2209
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
2,845✔
2210
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
78✔
2211
        } else if (tTagIsJson(data)) {
2,767✔
2212
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
947✔
2213
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,820✔
2214
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
1,482✔
2215
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
338✔
2216
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
246✔
2217
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
92!
2218
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
92✔
2219
        } else {
UNCOV
2220
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
UNCOV
2221
          return -1;
×
2222
        }
2223
      }
2224
      len += TMAX(colLen, estimateColLen);
705✔
2225
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,814!
2226
      int32_t lenTmp = numOfRows * sizeof(int32_t);
658✔
2227
      len += (lenTmp + colLen);
658✔
2228
      pStart += lenTmp;
658✔
2229
    } else {
2230
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
1,156✔
2231
      len += (lenTmp + colLen);
1,156✔
2232
      pStart += lenTmp;
1,156✔
2233
    }
2234
    pStart += colLen;
2,519✔
2235
  }
2236

2237
  // Ensure the complete structure of the block, including the blankfill field,
2238
  // even though it is not used on the client side.
2239
  len += sizeof(bool);
622✔
2240
  return len;
622✔
2241
}
2242

2243
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
1,225,301✔
2244
  int32_t numOfRows = pResultInfo->numOfRows;
1,225,301✔
2245
  int32_t numOfCols = pResultInfo->numOfCols;
1,225,301✔
2246
  bool    needConvert = false;
1,225,301✔
2247
  for (int32_t i = 0; i < numOfCols; ++i) {
5,133,416✔
2248
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
3,908,737✔
2249
      needConvert = true;
622✔
2250
      break;
622✔
2251
    }
2252
  }
2253

2254
  if (!needConvert) {
1,225,301✔
2255
    return TSDB_CODE_SUCCESS;
1,224,679✔
2256
  }
2257

2258
  tscDebug("start to convert form json format string");
622✔
2259

2260
  char*   p = (char*)pResultInfo->pData;
622✔
2261
  int32_t blockVersion = *(int32_t*)p;
622✔
2262
  int32_t dataLen = estimateJsonLen(pResultInfo);
622✔
2263
  if (dataLen <= 0) {
622!
UNCOV
2264
    tscError("doConvertJson error: estimateJsonLen failed");
×
UNCOV
2265
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2266
  }
2267

2268
  taosMemoryFreeClear(pResultInfo->convertJson);
622!
2269
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
622!
2270
  if (pResultInfo->convertJson == NULL) return terrno;
622!
2271
  char* p1 = pResultInfo->convertJson;
622✔
2272

2273
  int32_t totalLen = 0;
622✔
2274
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
622✔
2275
  if (numOfCols != cols) {
622!
UNCOV
2276
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2277
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2278
  }
2279

2280
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
622✔
2281
  (void)memcpy(p1, p, len);
622✔
2282

2283
  p += len;
622✔
2284
  p1 += len;
622✔
2285
  totalLen += len;
622✔
2286

2287
  len = sizeof(int32_t) * numOfCols;
622✔
2288
  int32_t* colLength = (int32_t*)p;
622✔
2289
  int32_t* colLength1 = (int32_t*)p1;
622✔
2290
  (void)memcpy(p1, p, len);
622✔
2291
  p += len;
622✔
2292
  p1 += len;
622✔
2293
  totalLen += len;
622✔
2294

2295
  char* pStart = p;
622✔
2296
  char* pStart1 = p1;
622✔
2297
  for (int32_t i = 0; i < numOfCols; ++i) {
3,141✔
2298
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,519!
2299
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
2,519!
2300
    if (colLen >= dataLen) {
2,519!
2301
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2302
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2303
    }
2304
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,519✔
2305
      int32_t* offset = (int32_t*)pStart;
705✔
2306
      int32_t* offset1 = (int32_t*)pStart1;
705✔
2307
      len = numOfRows * sizeof(int32_t);
705✔
2308
      (void)memcpy(pStart1, pStart, len);
705✔
2309
      pStart += len;
705✔
2310
      pStart1 += len;
705✔
2311
      totalLen += len;
705✔
2312

2313
      len = 0;
705✔
2314
      for (int32_t j = 0; j < numOfRows; ++j) {
3,783✔
2315
        if (offset[j] == -1) {
3,078✔
2316
          continue;
233✔
2317
        }
2318
        char* data = offset[j] + pStart;
2,845✔
2319

2320
        int32_t jsonInnerType = *data;
2,845✔
2321
        char*   jsonInnerData = data + CHAR_BYTES;
2,845✔
2322
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
2,845✔
2323
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
2,845✔
2324
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
78✔
2325
          varDataSetLen(dst, strlen(varDataVal(dst)));
78✔
2326
        } else if (tTagIsJson(data)) {
2,767✔
2327
          char* jsonString = NULL;
947✔
2328
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
947✔
2329
          if (jsonString == NULL) {
947!
UNCOV
2330
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
UNCOV
2331
            return terrno;
×
2332
          }
2333
          STR_TO_VARSTR(dst, jsonString);
947✔
2334
          taosMemoryFree(jsonString);
947!
2335
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,820✔
2336
          *(char*)varDataVal(dst) = '\"';
1,482✔
2337
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
1,482✔
2338
                                         varDataVal(dst) + CHAR_BYTES, pResultInfo->charsetCxt);
2339
          if (length <= 0) {
1,482✔
2340
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
6!
2341
              pResultInfo->charsetCxt != NULL ? ((SConvInfo *)(pResultInfo->charsetCxt))->charset : tsCharset);
2342
            length = 0;
6✔
2343
          }
2344
          varDataSetLen(dst, length + CHAR_BYTES * 2);
1,482✔
2345
          *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
1,482✔
2346
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
338✔
2347
          double jsonVd = *(double*)(jsonInnerData);
246✔
2348
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
246✔
2349
          varDataSetLen(dst, strlen(varDataVal(dst)));
246✔
2350
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
92!
2351
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
92✔
2352
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
92✔
2353
          varDataSetLen(dst, strlen(varDataVal(dst)));
92✔
2354
        } else {
UNCOV
2355
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
UNCOV
2356
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2357
        }
2358

2359
        offset1[j] = len;
2,845✔
2360
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
2,845✔
2361
        len += varDataTLen(dst);
2,845✔
2362
      }
2363
      colLen1 = len;
705✔
2364
      totalLen += colLen1;
705✔
2365
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
705!
2366
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,814!
2367
      len = numOfRows * sizeof(int32_t);
658✔
2368
      (void)memcpy(pStart1, pStart, len);
658✔
2369
      pStart += len;
658✔
2370
      pStart1 += len;
658✔
2371
      totalLen += len;
658✔
2372
      totalLen += colLen;
658✔
2373
      (void)memcpy(pStart1, pStart, colLen);
658✔
2374
    } else {
2375
      len = BitmapLen(pResultInfo->numOfRows);
1,156✔
2376
      (void)memcpy(pStart1, pStart, len);
1,156✔
2377
      pStart += len;
1,156✔
2378
      pStart1 += len;
1,156✔
2379
      totalLen += len;
1,156✔
2380
      totalLen += colLen;
1,156✔
2381
      (void)memcpy(pStart1, pStart, colLen);
1,156✔
2382
    }
2383
    pStart += colLen;
2,519✔
2384
    pStart1 += colLen1;
2,519✔
2385
  }
2386

2387
  // Ensure the complete structure of the block, including the blankfill field,
2388
  // even though it is not used on the client side.
2389
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2390
  totalLen += sizeof(bool);
622✔
2391

2392
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
622✔
2393
  pResultInfo->pData = pResultInfo->convertJson;
622✔
2394
  return TSDB_CODE_SUCCESS;
622✔
2395
}
2396

2397
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
1,288,534✔
2398
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
1,288,534!
UNCOV
2399
    tscError("setResultDataPtr paras error");
×
UNCOV
2400
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2401
  }
2402

2403
  if (pResultInfo->numOfRows == 0) {
1,288,541✔
2404
    return TSDB_CODE_SUCCESS;
63,297✔
2405
  }
2406

2407
  if (pResultInfo->pData == NULL) {
1,225,244!
2408
    tscError("setResultDataPtr error: pData is NULL");
×
UNCOV
2409
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2410
  }
2411

2412
  int32_t code = doPrepareResPtr(pResultInfo);
1,225,244✔
2413
  if (code != TSDB_CODE_SUCCESS) {
1,225,301!
UNCOV
2414
    return code;
×
2415
  }
2416
  code = doConvertJson(pResultInfo);
1,225,301✔
2417
  if (code != TSDB_CODE_SUCCESS) {
1,225,297!
UNCOV
2418
    return code;
×
2419
  }
2420

2421
  char* p = (char*)pResultInfo->pData;
1,225,297✔
2422

2423
  // version:
2424
  int32_t blockVersion = *(int32_t*)p;
1,225,297✔
2425
  p += sizeof(int32_t);
1,225,297✔
2426

2427
  int32_t dataLen = *(int32_t*)p;
1,225,297✔
2428
  p += sizeof(int32_t);
1,225,297✔
2429

2430
  int32_t rows = *(int32_t*)p;
1,225,297✔
2431
  p += sizeof(int32_t);
1,225,297✔
2432

2433
  int32_t cols = *(int32_t*)p;
1,225,297✔
2434
  p += sizeof(int32_t);
1,225,297✔
2435

2436
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
1,225,297!
UNCOV
2437
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
×
2438
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
UNCOV
2439
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2440
  }
2441

2442
  int32_t hasColumnSeg = *(int32_t*)p;
1,225,299✔
2443
  p += sizeof(int32_t);
1,225,299✔
2444

2445
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
1,225,299✔
2446
  p += sizeof(uint64_t);
1,225,299✔
2447

2448
  // check fields
2449
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
5,134,042✔
2450
    int8_t type = *(int8_t*)p;
3,908,755✔
2451
    p += sizeof(int8_t);
3,908,755✔
2452

2453
    int32_t bytes = *(int32_t*)p;
3,908,755✔
2454
    p += sizeof(int32_t);
3,908,755✔
2455

2456
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
3,908,755!
UNCOV
2457
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
×
2458
    }
2459
  }
2460

2461
  int32_t* colLength = (int32_t*)p;
1,225,287✔
2462
  p += sizeof(int32_t) * pResultInfo->numOfCols;
1,225,287✔
2463

2464
  char* pStart = p;
1,225,287✔
2465
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
5,133,870✔
2466
    if ((pStart - pResultInfo->pData) >= dataLen) {
3,908,580!
UNCOV
2467
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2468
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2469
    }
2470
    if (blockVersion == BLOCK_VERSION_1) {
3,908,580✔
2471
      colLength[i] = htonl(colLength[i]);
2,082,496✔
2472
    }
2473
    if (colLength[i] >= dataLen) {
3,908,580!
UNCOV
2474
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
UNCOV
2475
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2476
    }
2477
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
3,908,580!
2478
      tscError("invalid type %d", pResultInfo->fields[i].type);
5!
UNCOV
2479
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2480
    }
2481
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
3,908,575✔
2482
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
1,036,001✔
2483
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
1,036,001✔
2484
    } else {
2485
      pResultInfo->pCol[i].nullbitmap = pStart;
2,872,574✔
2486
      pStart += BitmapLen(pResultInfo->numOfRows);
2,872,574✔
2487
    }
2488

2489
    pResultInfo->pCol[i].pData = pStart;
3,908,575✔
2490
    pResultInfo->length[i] = calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
3,908,575✔
2491
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
3,908,583✔
2492

2493
    pStart += colLength[i];
3,908,583✔
2494
  }
2495

2496
  p = pStart;
1,225,290✔
2497
  // bool blankFill = *(bool*)p;
2498
  p += sizeof(bool);
1,225,290✔
2499
  int32_t offset = p - pResultInfo->pData;
1,225,290✔
2500
  if (offset > dataLen) {
1,225,290!
UNCOV
2501
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
UNCOV
2502
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2503
  }
2504

2505
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2506
  if (convertUcs4) {
1,225,290✔
2507
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
1,223,969✔
2508
  }
2509
#endif
2510
  if (TSDB_CODE_SUCCESS == code && convertUcs4) {
1,225,317✔
2511
    code = convertDecimalType(pResultInfo);
1,223,996✔
2512
  }
2513
  return code;
1,225,281✔
2514
}
2515

2516
char* getDbOfConnection(STscObj* pObj) {
10,807,576✔
2517
  terrno = TSDB_CODE_SUCCESS;
10,807,576✔
2518
  char* p = NULL;
10,807,476✔
2519
  (void)taosThreadMutexLock(&pObj->mutex);
10,807,476✔
2520
  size_t len = strlen(pObj->db);
10,815,820✔
2521
  if (len > 0) {
10,815,820✔
2522
    p = taosStrndup(pObj->db, tListLen(pObj->db));
10,461,854!
2523
    if (p == NULL) {
10,458,416!
UNCOV
2524
      tscError("failed to taosStrndup db name");
×
2525
    }
2526
  }
2527

2528
  (void)taosThreadMutexUnlock(&pObj->mutex);
10,812,382✔
2529
  return p;
10,816,046✔
2530
}
2531

2532
void setConnectionDB(STscObj* pTscObj, const char* db) {
7,611✔
2533
  if (db == NULL || pTscObj == NULL) {
7,611!
UNCOV
2534
    tscError("setConnectionDB para is NULL");
×
UNCOV
2535
    return;
×
2536
  }
2537

2538
  (void)taosThreadMutexLock(&pTscObj->mutex);
7,611✔
2539
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
7,611✔
2540
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
7,611✔
2541
}
2542

UNCOV
2543
void resetConnectDB(STscObj* pTscObj) {
×
UNCOV
2544
  if (pTscObj == NULL) {
×
2545
    return;
×
2546
  }
2547

UNCOV
2548
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
UNCOV
2549
  pTscObj->db[0] = 0;
×
UNCOV
2550
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2551
}
2552

2553
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4, bool isStmt) {
954,401✔
2554
  if (pResultInfo == NULL || pRsp == NULL) {
954,401!
UNCOV
2555
    tscError("setQueryResultFromRsp paras is null");
×
UNCOV
2556
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2557
  }
2558

2559
  taosMemoryFreeClear(pResultInfo->pRspMsg);
954,418!
2560
  pResultInfo->pRspMsg = (const char*)pRsp;
954,418✔
2561
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
954,418✔
2562
  pResultInfo->current = 0;
954,405✔
2563
  pResultInfo->completed = (pRsp->completed == 1);
954,405✔
2564
  pResultInfo->precision = pRsp->precision;
954,405✔
2565

2566
  // decompress data if needed
2567
  int32_t payloadLen = htonl(pRsp->payloadLen);
954,405✔
2568

2569
  if (pRsp->compressed) {
954,405✔
2570
    if (pResultInfo->decompBuf == NULL) {
646✔
2571
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
14!
2572
      if (pResultInfo->decompBuf == NULL) {
14!
UNCOV
2573
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2574
        return terrno;
×
2575
      }
2576
      pResultInfo->decompBufSize = payloadLen;
14✔
2577
    } else {
2578
      if (pResultInfo->decompBufSize < payloadLen) {
632✔
2579
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
12!
2580
        if (p == NULL) {
12!
UNCOV
2581
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2582
          return terrno;
×
2583
        }
2584

2585
        pResultInfo->decompBuf = p;
12✔
2586
        pResultInfo->decompBufSize = payloadLen;
12✔
2587
      }
2588
    }
2589
  }
2590

2591
  if (payloadLen > 0) {
954,405✔
2592
    int32_t compLen = *(int32_t*)pRsp->data;
891,104✔
2593
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
891,104✔
2594

2595
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
891,104✔
2596

2597
    if (pRsp->compressed && compLen < rawLen) {
891,104!
2598
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
646✔
2599
      if (len < 0) {
646!
UNCOV
2600
        tscError("tsDecompressString failed");
×
UNCOV
2601
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2602
      }
2603
      if (len != rawLen) {
646!
UNCOV
2604
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
UNCOV
2605
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2606
      }
2607
      pResultInfo->pData = pResultInfo->decompBuf;
646✔
2608
      pResultInfo->payloadLen = rawLen;
646✔
2609
    } else {
2610
      pResultInfo->pData = pStart;
890,458✔
2611
      pResultInfo->payloadLen = htonl(pRsp->compLen);
890,458✔
2612
      if (pRsp->compLen != pRsp->payloadLen) {
890,458!
UNCOV
2613
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
UNCOV
2614
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2615
      }
2616
    }
2617
  }
2618

2619
  // TODO handle the compressed case
2620
  pResultInfo->totalRows += pResultInfo->numOfRows;
954,405✔
2621

2622
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
954,405✔
2623
  return code;
954,413✔
2624
}
2625

2626
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
5✔
2627
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
5✔
2628
  void*              clientRpc = NULL;
5✔
2629
  SServerStatusRsp   statusRsp = {0};
5✔
2630
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
5✔
2631
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
5✔
2632
  SRpcMsg  rpcRsp = {0};
5✔
2633
  SRpcInit rpcInit = {0};
5✔
2634
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
5✔
2635

2636
  rpcInit.label = "CHK";
5✔
2637
  rpcInit.numOfThreads = 1;
5✔
2638
  rpcInit.cfp = NULL;
5✔
2639
  rpcInit.sessions = 16;
5✔
2640
  rpcInit.connType = TAOS_CONN_CLIENT;
5✔
2641
  rpcInit.idleTime = tsShellActivityTimer * 1000;
5✔
2642
  rpcInit.compressSize = tsCompressMsgSize;
5✔
2643
  rpcInit.user = "_dnd";
5✔
2644

2645
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
5✔
2646
  connLimitNum = TMAX(connLimitNum, 10);
5✔
2647
  connLimitNum = TMIN(connLimitNum, 500);
5✔
2648
  rpcInit.connLimitNum = connLimitNum;
5✔
2649
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
5✔
2650
  rpcInit.readTimeout = tsReadTimeout;
5✔
2651
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
5!
UNCOV
2652
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
UNCOV
2653
    goto _OVER;
×
2654
  }
2655

2656
  clientRpc = rpcOpen(&rpcInit);
5✔
2657
  if (clientRpc == NULL) {
5!
2658
    code = terrno;
×
UNCOV
2659
    tscError("failed to init server status client since %s", tstrerror(code));
×
UNCOV
2660
    goto _OVER;
×
2661
  }
2662

2663
  if (fqdn == NULL) {
5!
2664
    fqdn = tsLocalFqdn;
5✔
2665
  }
2666

2667
  if (port == 0) {
5!
2668
    port = tsServerPort;
5✔
2669
  }
2670

2671
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
5✔
2672
  epSet.eps[0].port = (uint16_t)port;
5✔
2673
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
5✔
2674
  if (TSDB_CODE_SUCCESS != ret) {
5!
UNCOV
2675
    tscError("failed to send recv since %s", tstrerror(ret));
×
UNCOV
2676
    goto _OVER;
×
2677
  }
2678

2679
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
5!
2680
    tscError("failed to send server status req since %s", terrstr());
1!
2681
    goto _OVER;
1✔
2682
  }
2683

2684
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
4!
UNCOV
2685
    tscError("failed to parse server status rsp since %s", terrstr());
×
UNCOV
2686
    goto _OVER;
×
2687
  }
2688

2689
  code = statusRsp.statusCode;
4✔
2690
  if (details != NULL) {
4!
2691
    tstrncpy(details, statusRsp.details, maxlen);
4✔
2692
  }
2693

UNCOV
2694
_OVER:
×
2695
  if (clientRpc != NULL) {
5!
2696
    rpcClose(clientRpc);
5✔
2697
  }
2698
  if (rpcRsp.pCont != NULL) {
5✔
2699
    rpcFreeCont(rpcRsp.pCont);
4✔
2700
  }
2701
  return code;
5✔
2702
}
2703

2704
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
3✔
2705
                      int32_t acctId, char* db) {
2706
  SName name = {0};
3✔
2707

2708
  if (len1 <= 0) {
3!
2709
    return -1;
×
2710
  }
2711

2712
  const char* dbName = db;
3✔
2713
  const char* tbName = NULL;
3✔
2714
  int32_t     dbLen = 0;
3✔
2715
  int32_t     tbLen = 0;
3✔
2716
  if (len2 > 0) {
3!
UNCOV
2717
    dbName = str + pos1;
×
UNCOV
2718
    dbLen = len1;
×
2719
    tbName = str + pos2;
×
UNCOV
2720
    tbLen = len2;
×
2721
  } else {
2722
    dbLen = strlen(db);
3✔
2723
    tbName = str + pos1;
3✔
2724
    tbLen = len1;
3✔
2725
  }
2726

2727
  if (dbLen <= 0 || tbLen <= 0) {
3!
UNCOV
2728
    return -1;
×
2729
  }
2730

2731
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
3!
UNCOV
2732
    return -1;
×
2733
  }
2734

2735
  if (tNameAddTbName(&name, tbName, tbLen)) {
3!
UNCOV
2736
    return -1;
×
2737
  }
2738

2739
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
3✔
2740
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
3✔
2741

2742
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
3✔
2743
  if (pDb) {
3!
UNCOV
2744
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
UNCOV
2745
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2746
    }
2747
  } else {
2748
    STablesReq db;
2749
    db.pTables = taosArrayInit(20, sizeof(SName));
3✔
2750
    if (NULL == db.pTables) {
3!
UNCOV
2751
      return terrno;
×
2752
    }
2753
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
3✔
2754
    if (NULL == taosArrayPush(db.pTables, &name)) {
6!
UNCOV
2755
      return terrno;
×
2756
    }
2757
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
3!
2758
  }
2759

2760
  return TSDB_CODE_SUCCESS;
3✔
2761
}
2762

2763
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
3✔
2764
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
3✔
2765
  if (NULL == pHash) {
3!
UNCOV
2766
    return terrno;
×
2767
  }
2768

2769
  bool    inEscape = false;
3✔
2770
  int32_t code = 0;
3✔
2771
  void*   pIter = NULL;
3✔
2772

2773
  int32_t vIdx = 0;
3✔
2774
  int32_t vPos[2];
2775
  int32_t vLen[2];
2776

2777
  (void)memset(vPos, -1, sizeof(vPos));
3✔
2778
  (void)memset(vLen, 0, sizeof(vLen));
3✔
2779

2780
  for (int32_t i = 0;; ++i) {
12✔
2781
    if (0 == *(tbList + i)) {
12✔
2782
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
3!
2783
        vLen[vIdx] = i - vPos[vIdx];
3✔
2784
      }
2785

2786
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
3✔
2787
      if (code) {
3!
UNCOV
2788
        goto _return;
×
2789
      }
2790

2791
      break;
3✔
2792
    }
2793

2794
    if ('`' == *(tbList + i)) {
9!
UNCOV
2795
      inEscape = !inEscape;
×
2796
      if (!inEscape) {
×
2797
        if (vPos[vIdx] >= 0) {
×
UNCOV
2798
          vLen[vIdx] = i - vPos[vIdx];
×
2799
        } else {
2800
          goto _return;
×
2801
        }
2802
      }
2803

UNCOV
2804
      continue;
×
2805
    }
2806

2807
    if (inEscape) {
9!
2808
      if (vPos[vIdx] < 0) {
×
UNCOV
2809
        vPos[vIdx] = i;
×
2810
      }
UNCOV
2811
      continue;
×
2812
    }
2813

2814
    if ('.' == *(tbList + i)) {
9!
2815
      if (vPos[vIdx] < 0) {
×
UNCOV
2816
        goto _return;
×
2817
      }
UNCOV
2818
      if (vLen[vIdx] <= 0) {
×
2819
        vLen[vIdx] = i - vPos[vIdx];
×
2820
      }
2821
      vIdx++;
×
UNCOV
2822
      if (vIdx >= 2) {
×
UNCOV
2823
        goto _return;
×
2824
      }
UNCOV
2825
      continue;
×
2826
    }
2827

2828
    if (',' == *(tbList + i)) {
9!
2829
      if (vPos[vIdx] < 0) {
×
UNCOV
2830
        goto _return;
×
2831
      }
UNCOV
2832
      if (vLen[vIdx] <= 0) {
×
UNCOV
2833
        vLen[vIdx] = i - vPos[vIdx];
×
2834
      }
2835

UNCOV
2836
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
UNCOV
2837
      if (code) {
×
UNCOV
2838
        goto _return;
×
2839
      }
2840

2841
      (void)memset(vPos, -1, sizeof(vPos));
×
UNCOV
2842
      (void)memset(vLen, 0, sizeof(vLen));
×
UNCOV
2843
      vIdx = 0;
×
UNCOV
2844
      continue;
×
2845
    }
2846

2847
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
9!
UNCOV
2848
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
UNCOV
2849
        vLen[vIdx] = i - vPos[vIdx];
×
2850
      }
UNCOV
2851
      continue;
×
2852
    }
2853

2854
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
9!
2855
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
×
2856
      if (vLen[vIdx] > 0) {
9!
2857
        goto _return;
×
2858
      }
2859
      if (vPos[vIdx] < 0) {
9✔
2860
        vPos[vIdx] = i;
3✔
2861
      }
2862
      continue;
9✔
2863
    }
2864

UNCOV
2865
    goto _return;
×
2866
  }
2867

2868
  int32_t dbNum = taosHashGetSize(pHash);
3✔
2869
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
3✔
2870
  if (NULL == pReq) {
3!
2871
    TSC_ERR_JRET(terrno);
×
2872
  }
2873
  pIter = taosHashIterate(pHash, NULL);
3✔
2874
  while (pIter) {
6✔
2875
    STablesReq* pDb = (STablesReq*)pIter;
3✔
2876
    if (NULL == taosArrayPush(*pReq, pDb)) {
6!
UNCOV
2877
      TSC_ERR_JRET(terrno);
×
2878
    }
2879
    pIter = taosHashIterate(pHash, pIter);
3✔
2880
  }
2881

2882
  taosHashCleanup(pHash);
3✔
2883

2884
  return TSDB_CODE_SUCCESS;
3✔
2885

UNCOV
2886
_return:
×
2887

UNCOV
2888
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
2889

UNCOV
2890
  pIter = taosHashIterate(pHash, NULL);
×
UNCOV
2891
  while (pIter) {
×
2892
    STablesReq* pDb = (STablesReq*)pIter;
×
2893
    taosArrayDestroy(pDb->pTables);
×
2894
    pIter = taosHashIterate(pHash, pIter);
×
2895
  }
2896

2897
  taosHashCleanup(pHash);
×
2898

UNCOV
2899
  return terrno;
×
2900
}
2901

2902
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
3✔
2903
  SSyncQueryParam* pParam = param;
3✔
2904
  pParam->pRequest->code = code;
3✔
2905

2906
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
3!
UNCOV
2907
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2908
  }
2909
}
3✔
2910

2911
void syncQueryFn(void* param, void* res, int32_t code) {
10,716,248✔
2912
  SSyncQueryParam* pParam = param;
10,716,248✔
2913
  pParam->pRequest = res;
10,716,248✔
2914

2915
  if (pParam->pRequest) {
10,716,248!
2916
    pParam->pRequest->code = code;
10,716,327✔
2917
    clientOperateReport(pParam->pRequest);
10,716,327✔
2918
  }
2919

2920
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
10,717,769!
UNCOV
2921
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2922
  }
2923
}
10,725,487✔
2924

2925
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
10,715,633✔
2926
                        int8_t source) {
2927
  if (sql == NULL || NULL == fp) {
10,715,633!
2928
    terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
2929
    if (fp) {
×
UNCOV
2930
      fp(param, NULL, terrno);
×
2931
    }
2932

2933
    return;
×
2934
  }
2935

2936
  size_t sqlLen = strlen(sql);
10,722,963✔
2937
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
10,722,963!
UNCOV
2938
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
×
UNCOV
2939
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
UNCOV
2940
    fp(param, NULL, terrno);
×
UNCOV
2941
    return;
×
2942
  }
2943

2944
  SRequestObj* pRequest = NULL;
10,722,963✔
2945
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
10,722,963✔
2946
  if (code != TSDB_CODE_SUCCESS) {
10,715,019!
UNCOV
2947
    terrno = code;
×
UNCOV
2948
    fp(param, NULL, terrno);
×
UNCOV
2949
    return;
×
2950
  }
2951

2952
  pRequest->source = source;
10,715,019✔
2953
  pRequest->body.queryFp = fp;
10,715,019✔
2954
  doAsyncQuery(pRequest, false);
10,715,019✔
2955
}
2956
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
1✔
2957
                                 int64_t reqid) {
2958
  if (sql == NULL || NULL == fp) {
1!
UNCOV
2959
    terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
2960
    if (fp) {
×
2961
      fp(param, NULL, terrno);
×
2962
    }
2963

UNCOV
2964
    return;
×
2965
  }
2966

2967
  size_t sqlLen = strlen(sql);
1✔
2968
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
1!
UNCOV
2969
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
×
UNCOV
2970
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
UNCOV
2971
    fp(param, NULL, terrno);
×
2972
    return;
×
2973
  }
2974

2975
  SRequestObj* pRequest = NULL;
1✔
2976
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
1✔
2977
  if (code != TSDB_CODE_SUCCESS) {
1!
UNCOV
2978
    terrno = code;
×
UNCOV
2979
    fp(param, NULL, terrno);
×
UNCOV
2980
    return;
×
2981
  }
2982

2983
  pRequest->body.queryFp = fp;
1✔
2984
  doAsyncQuery(pRequest, false);
1✔
2985
}
2986

2987
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
10,714,771✔
2988
  if (NULL == taos) {
10,714,771!
UNCOV
2989
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
2990
    return NULL;
×
2991
  }
2992

2993
  tscDebug("connObj:0x%" PRIx64 ", taos_query start with sql:%s", *(int64_t*)taos, sql);
10,714,771✔
2994

2995
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
10,714,771!
2996
  if (NULL == param) {
10,722,605!
UNCOV
2997
    return NULL;
×
2998
  }
2999
  int32_t code = tsem_init(&param->sem, 0, 0);
10,722,605✔
3000
  if (TSDB_CODE_SUCCESS != code) {
10,721,939!
3001
    taosMemoryFree(param);
×
UNCOV
3002
    return NULL;
×
3003
  }
3004

3005
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
10,721,939✔
3006
  code = tsem_wait(&param->sem);
10,681,514✔
3007
  if (TSDB_CODE_SUCCESS != code) {
10,720,804!
UNCOV
3008
    taosMemoryFree(param);
×
UNCOV
3009
    return NULL;
×
3010
  }
3011
  code = tsem_destroy(&param->sem);
10,720,804✔
3012
  if (TSDB_CODE_SUCCESS != code) {
10,717,804!
3013
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3014
  }
3015

3016
  SRequestObj* pRequest = NULL;
10,721,988✔
3017
  if (param->pRequest != NULL) {
10,721,988!
3018
    param->pRequest->syncQuery = true;
10,721,988✔
3019
    pRequest = param->pRequest;
10,721,988✔
3020
    param->pRequest->inCallback = false;
10,721,988✔
3021
  }
3022
  taosMemoryFree(param);
10,721,988!
3023

3024
  tscDebug("connObj:0x%" PRIx64 ", res:%p created, taos_query end", *(int64_t*)taos, pRequest);
10,718,366✔
3025

3026
  return pRequest;
10,720,552✔
3027
}
3028

3029
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
1✔
3030
  if (NULL == taos) {
1!
UNCOV
3031
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3032
    return NULL;
×
3033
  }
3034

3035
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
1!
3036
  if (param == NULL) {
1!
3037
    return NULL;
×
3038
  }
3039
  int32_t code = tsem_init(&param->sem, 0, 0);
1✔
3040
  if (TSDB_CODE_SUCCESS != code) {
1!
UNCOV
3041
    taosMemoryFree(param);
×
UNCOV
3042
    return NULL;
×
3043
  }
3044

3045
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
1✔
3046
  code = tsem_wait(&param->sem);
1✔
3047
  if (TSDB_CODE_SUCCESS != code) {
1!
UNCOV
3048
    taosMemoryFree(param);
×
UNCOV
3049
    return NULL;
×
3050
  }
3051
  SRequestObj* pRequest = NULL;
1✔
3052
  if (param->pRequest != NULL) {
1!
3053
    param->pRequest->syncQuery = true;
1✔
3054
    pRequest = param->pRequest;
1✔
3055
  }
3056
  taosMemoryFree(param);
1!
3057
  return pRequest;
1✔
3058
}
3059

3060
static void fetchCallback(void* pResult, void* param, int32_t code) {
857,145✔
3061
  SRequestObj* pRequest = (SRequestObj*)param;
857,145✔
3062

3063
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
857,145✔
3064

3065
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
857,145✔
3066
           pRequest->requestId);
3067

3068
  pResultInfo->pData = pResult;
857,138✔
3069
  pResultInfo->numOfRows = 0;
857,138✔
3070

3071
  if (code != TSDB_CODE_SUCCESS) {
857,138!
UNCOV
3072
    pRequest->code = code;
×
UNCOV
3073
    taosMemoryFreeClear(pResultInfo->pData);
×
UNCOV
3074
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
UNCOV
3075
    return;
×
3076
  }
3077

3078
  if (pRequest->code != TSDB_CODE_SUCCESS) {
857,138!
UNCOV
3079
    taosMemoryFreeClear(pResultInfo->pData);
×
UNCOV
3080
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
UNCOV
3081
    return;
×
3082
  }
3083

3084
  pRequest->code =
857,152✔
3085
      setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, pRequest->isStmtBind);
857,138✔
3086
  if (pRequest->code != TSDB_CODE_SUCCESS) {
857,152!
UNCOV
3087
    pResultInfo->numOfRows = 0;
×
UNCOV
3088
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(pRequest->code),
×
3089
             pRequest->requestId);
3090
  } else {
3091
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
857,152✔
3092
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
3093
             pRequest->requestId);
3094

3095
    STscObj*            pTscObj = pRequest->pTscObj;
857,152✔
3096
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
857,152✔
3097
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
857,152✔
3098
  }
3099

3100
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
857,186✔
3101
}
3102

3103
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
935,028✔
3104
  pRequest->body.fetchFp = fp;
935,028✔
3105
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
935,028✔
3106

3107
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
935,028✔
3108

3109
  // this query has no results or error exists, return directly
3110
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
935,028!
3111
    pResultInfo->numOfRows = 0;
1✔
3112
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
1✔
3113
    return;
77,839✔
3114
  }
3115

3116
  // all data has returned to App already, no need to try again
3117
  if (pResultInfo->completed) {
935,028✔
3118
    // it is a local executed query, no need to do async fetch
3119
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
77,838✔
3120
      if (pResultInfo->localResultFetched) {
2,664✔
3121
        pResultInfo->numOfRows = 0;
1,332✔
3122
        pResultInfo->current = 0;
1,332✔
3123
      } else {
3124
        pResultInfo->localResultFetched = true;
1,332✔
3125
      }
3126
    } else {
3127
      pResultInfo->numOfRows = 0;
75,174✔
3128
    }
3129

3130
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
77,838✔
3131
    return;
77,838✔
3132
  }
3133

3134
  SSchedulerReq req = {
857,190✔
3135
      .syncReq = false,
3136
      .fetchFp = fetchCallback,
3137
      .cbParam = pRequest,
3138
  };
3139

3140
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
857,190✔
3141
  if (TSDB_CODE_SUCCESS != code) {
857,184!
UNCOV
3142
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3143
    // pRequest->body.fetchFp(param, pRequest, code);
3144
  }
3145
}
3146

3147
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
10,717,828✔
3148
  pRequest->inCallback = true;
10,717,828✔
3149
  int64_t this = pRequest->self;
10,717,828✔
3150
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
10,717,828!
3151
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
316!
UNCOV
3152
    code = TSDB_CODE_SUCCESS;
×
UNCOV
3153
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3154
  }
3155
  if (pRequest->body.queryFp != NULL) {
10,717,828!
3156
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
10,720,670✔
3157
  }
3158
  SRequestObj* pReq = acquireRequest(this);
10,721,685✔
3159
  if (pReq != NULL) {
10,725,306✔
3160
    pReq->inCallback = false;
10,718,230✔
3161
    (void)releaseRequest(this);
10,718,230✔
3162
  }
3163
}
10,719,898✔
3164

3165
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
915✔
3166
                       SParseSqlRes* pRes) {
3167
#ifndef TD_ENTERPRISE
3168
  return TSDB_CODE_SUCCESS;
3169
#else
3170
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
915✔
3171
#endif
3172
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc