• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4819

20 Oct 2025 02:42AM UTC coverage: 61.392% (+0.3%) from 61.125%
#4819

push

travis-ci

happyguoxy
add coverage result json

156672 of 324369 branches covered (48.3%)

Branch coverage included in aggregate %.

207936 of 269535 relevant lines covered (77.15%)

242336611.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.51
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "decimal.h"
22
#include "scheduler.h"
23
#include "tdatablock.h"
24
#include "tdataformat.h"
25
#include "tdef.h"
26
#include "tglobal.h"
27
#include "tmisce.h"
28
#include "tmsg.h"
29
#include "tmsgtype.h"
30
#include "tpagedbuf.h"
31
#include "tref.h"
32
#include "tsched.h"
33
#include "tversion.h"
34

35
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
36
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
37

38
void setQueryRequest(int64_t rId) {
879,352,331✔
39
  SRequestObj* pReq = acquireRequest(rId);
879,352,331✔
40
  if (pReq != NULL) {
879,357,586✔
41
    pReq->isQuery = true;
879,330,646✔
42
    (void)releaseRequest(rId);
879,329,974✔
43
  }
44
}
879,355,982✔
45

46
static bool stringLengthCheck(const char* str, size_t maxsize) {
31,951,206✔
47
  if (str == NULL) {
31,951,206!
48
    return false;
×
49
  }
50

51
  size_t len = strlen(str);
31,951,206!
52
  if (len <= 0 || len > maxsize) {
31,951,206✔
53
    return false;
205✔
54
  }
55

56
  return true;
31,952,859✔
57
}
58

59
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
13,424,731✔
60

61
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
13,421,464✔
62

63
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
5,105,643✔
64

65
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
13,417,292✔
66
  char key[512] = {0};
13,417,292✔
67
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
13,419,586!
68
  return taosStrdup(key);
13,419,586!
69
}
70

71
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
3,076,681✔
72
  if (dst == NULL || src == NULL || srcLength == 0) {
3,076,681!
73
    return 0;
2,301✔
74
  }
75
  
76
  size_t escapeLength = 0;
3,074,380✔
77
  for(size_t i = 0; i < srcLength; ++i) {
93,610,532✔
78
    if( src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' ||
90,536,152!
79
        src[i] == '\r' || src[i] == '\t') {
90,531,634✔
80
      escapeLength += 1; 
7,530✔
81
    }    
82
  }
83

84
  size_t dstLength = srcLength;
3,074,380✔
85
  if(escapeLength == 0) {
3,074,380✔
86
     (void)memcpy(dst, src, srcLength);
3,066,850!
87
  } else {
88
    dstLength = 0;
7,530✔
89
    for(size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
82,830!
90
      switch(src[i]) {
75,300!
91
        case '\"':
×
92
          dst[dstLength++] = '\\';
×
93
          dst[dstLength++] = '\"';
×
94
          break;
×
95
        case '\\':
×
96
          dst[dstLength++] = '\\';
×
97
          dst[dstLength++] = '\\';
×
98
          break;
×
99
        case '\b':
1,506✔
100
          dst[dstLength++] = '\\';
1,506✔
101
          dst[dstLength++] = 'b';
1,506✔
102
          break;
1,506✔
103
        case '\f':
1,506✔
104
          dst[dstLength++] = '\\';
1,506✔
105
          dst[dstLength++] = 'f';
1,506✔
106
          break;
1,506✔
107
        case '\n':
1,506✔
108
          dst[dstLength++] = '\\';
1,506✔
109
          dst[dstLength++] = 'n';
1,506✔
110
          break;
1,506✔
111
        case '\r':
1,506✔
112
          dst[dstLength++] = '\\';
1,506✔
113
          dst[dstLength++] = 'r';
1,506✔
114
          break;
1,506✔
115
        case '\t':
1,506✔
116
          dst[dstLength++] = '\\';
1,506✔
117
          dst[dstLength++] = 't';
1,506✔
118
          break;
1,506✔
119
        default:
67,770✔
120
           dst[dstLength++] = src[i];
67,770✔
121
      }
122
    }
123
  }
124

125
  return dstLength;
3,074,380✔
126
}
127

128
bool chkRequestKilled(void* param) {
2,147,483,647✔
129
  bool         killed = false;
2,147,483,647✔
130
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
131
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647!
132
    killed = true;
×
133
  }
134

135
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
136

137
  return killed;
2,147,483,647✔
138
}
139

140
void cleanupAppInfo() {
6,505,493✔
141
  taosHashCleanup(appInfo.pInstMap);
6,505,493✔
142
  taosHashCleanup(appInfo.pInstMapByClusterId);
6,505,493✔
143
  tscInfo("cluster instance map cleaned");
6,505,493!
144
}
6,505,493✔
145

146
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
147
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
148

149
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
13,426,020✔
150
                              uint16_t port, int connType, STscObj** pObj) {
151
  TSC_ERR_RET(taos_init());
13,426,020!
152
  if (!validateUserName(user)) {
13,426,162!
153
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
154
  }
155
  int32_t code = 0;
13,424,731✔
156

157
  char localDb[TSDB_DB_NAME_LEN] = {0};
13,424,731✔
158
  if (db != NULL && strlen(db) > 0) {
13,424,731✔
159
    if (!validateDbName(db)) {
5,105,643!
160
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
161
    }
162

163
    tstrncpy(localDb, db, sizeof(localDb));
5,106,869!
164
    (void)strdequote(localDb);
5,106,869✔
165
  }
166

167
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
13,424,589✔
168
  if (auth == NULL) {
13,424,589✔
169
    if (!validatePassword(pass)) {
13,421,322!
170
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
171
    }
172

173
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
13,420,307!
174
  } else {
175
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
3,267!
176
  }
177

178
  SCorEpSet epSet = {0};
13,424,741✔
179
  if (ip) {
13,423,594✔
180
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
4,176,986✔
181
  } else {
182
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
9,246,608!
183
  }
184

185
  if (port) {
13,418,904✔
186
    epSet.epSet.eps[0].port = port;
444,064✔
187
    epSet.epSet.eps[1].port = port;
444,064✔
188
  }
189

190
  char* key = getClusterKey(user, secretEncrypt, ip, port);
13,418,904✔
191
  if (NULL == key) {
13,417,950!
192
    TSC_ERR_RET(terrno);
×
193
  }
194
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
13,417,950✔
195
          user, db, key);
196
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
36,092,727✔
197
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
22,669,653✔
198
  }
199
  // for (int32_t i = 0; i < epSet.epSet.numOfEps; i++) {
200
  //   if ((code = taosValidFqdn(tsEnableIpv6, epSet.epSet.eps[i].fqdn)) != 0) {
201
  //     taosMemFree(key);
202
  //     tscError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6,
203
  //              epSet.epSet.eps[i].fqdn, tstrerror(code));
204
  //     TSC_ERR_RET(code);
205
  //   }
206
  // }
207

208
  SAppInstInfo** pInst = NULL;
13,423,074✔
209
  code = taosThreadMutexLock(&appInfo.mutex);
13,423,074✔
210
  if (TSDB_CODE_SUCCESS != code) {
13,422,932!
211
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
212
    TSC_ERR_RET(code);
×
213
  }
214

215
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
13,422,932!
216
  SAppInstInfo* p = NULL;
13,422,932✔
217
  if (pInst == NULL) {
13,422,932✔
218
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
6,771,946!
219
    if (NULL == p) {
6,771,946!
220
      TSC_ERR_JRET(terrno);
×
221
    }
222
    p->mgmtEp = epSet;
6,771,946✔
223
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
6,771,946✔
224
    if (TSDB_CODE_SUCCESS != code) {
6,771,946!
225
      taosMemoryFree(p);
×
226
      TSC_ERR_JRET(code);
×
227
    }
228
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
6,771,946!
229
    if (TSDB_CODE_SUCCESS != code) {
6,771,946!
230
      taosMemoryFree(p);
×
231
      TSC_ERR_JRET(code);
×
232
    }
233
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
6,771,946✔
234
    if (TSDB_CODE_SUCCESS != code) {
6,771,946!
235
      destroyAppInst(&p);
×
236
      TSC_ERR_JRET(code);
×
237
    }
238
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
6,771,946!
239
    if (TSDB_CODE_SUCCESS != code) {
6,771,946!
240
      destroyAppInst(&p);
×
241
      TSC_ERR_JRET(code);
×
242
    }
243
    p->instKey = key;
6,771,946✔
244
    key = NULL;
6,771,946✔
245
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
6,771,946!
246

247
    pInst = &p;
6,771,946✔
248
  } else {
249
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
6,650,986!
250
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
251
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
252
    }
253
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
254
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
6,650,986✔
255
  }
256

257
_return:
13,422,932✔
258

259
  if (TSDB_CODE_SUCCESS != code) {
13,422,932!
260
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
261
    taosMemoryFreeClear(key);
×
262
    return code;
×
263
  } else {
264
    code = taosThreadMutexUnlock(&appInfo.mutex);
13,422,932✔
265
    taosMemoryFreeClear(key);
13,422,932!
266
    if (TSDB_CODE_SUCCESS != code) {
13,422,932!
267
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
268
      return code;
×
269
    }
270
    return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
13,422,932✔
271
  }
272
}
273

274
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
275
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
276
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
277
//     return *ppAppInstInfo;
278
//   } else {
279
//     return NULL;
280
//   }
281
// }
282

283
void freeQueryParam(SSyncQueryParam* param) {
1,586,742✔
284
  if (param == NULL) return;
1,586,742!
285
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
1,586,742!
286
    tscError("failed to destroy semaphore in freeQueryParam");
×
287
  }
288
  taosMemoryFree(param);
1,586,742!
289
}
290

291
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
2,147,483,647✔
292
                     SRequestObj** pRequest, int64_t reqid) {
293
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
2,147,483,647✔
294
  if (TSDB_CODE_SUCCESS != code) {
2,147,483,647!
295
    tscError("failed to malloc sqlObj, %s", sql);
×
296
    return code;
×
297
  }
298

299
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
2,147,483,647!
300
  if ((*pRequest)->sqlstr == NULL) {
2,147,483,647!
301
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
302
    destroyRequest(*pRequest);
×
303
    *pRequest = NULL;
×
304
    return terrno;
×
305
  }
306

307
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
2,147,483,647✔
308
  (*pRequest)->sqlstr[sqlLen] = 0;
2,147,483,647✔
309
  (*pRequest)->sqlLen = sqlLen;
2,147,483,647✔
310
  (*pRequest)->validateOnly = validateSql;
2,147,483,647✔
311
  (*pRequest)->stmtBindVersion = 0;
2,147,483,647✔
312

313
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
2,147,483,647✔
314

315
  STscObj* pTscObj = (*pRequest)->pTscObj;
2,147,483,647✔
316
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
2,147,483,647✔
317
                             sizeof((*pRequest)->self));
318
  if (err) {
2,147,483,647!
319
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
320
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
321
    destroyRequest(*pRequest);
×
322
    *pRequest = NULL;
×
323
    return terrno;
×
324
  }
325

326
  (*pRequest)->allocatorRefId = -1;
2,147,483,647✔
327
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
2,147,483,647✔
328
    if (TSDB_CODE_SUCCESS !=
813,541,684!
329
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
813,526,781✔
330
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
331
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
332
      destroyRequest(*pRequest);
×
333
      *pRequest = NULL;
×
334
      return terrno;
×
335
    }
336
  }
337

338
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
2,147,483,647✔
339
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
340
}
341

342
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
343
  int32_t code =
344
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
345
  if (TSDB_CODE_SUCCESS == code) {
×
346
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
347
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
348
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
349
    (*pNewRequest)->isSubReq = true;
×
350
  }
351
  return code;
×
352
}
353

354
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
9,641,747✔
355
  STscObj* pTscObj = pRequest->pTscObj;
9,641,747✔
356

357
  SParseContext cxt = {
9,642,203✔
358
      .requestId = pRequest->requestId,
9,640,549✔
359
      .requestRid = pRequest->self,
9,641,005✔
360
      .acctId = pTscObj->acctId,
9,641,747✔
361
      .db = pRequest->pDb,
9,636,527✔
362
      .topicQuery = topicQuery,
363
      .pSql = pRequest->sqlstr,
9,639,403✔
364
      .sqlLen = pRequest->sqlLen,
9,641,069✔
365
      .pMsg = pRequest->msgBuf,
9,641,890✔
366
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
367
      .pTransporter = pTscObj->pAppInfo->pTransporter,
9,638,102✔
368
      .pStmtCb = pStmtCb,
369
      .pUser = pTscObj->user,
9,640,315✔
370
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
9,642,203!
371
      .enableSysInfo = pTscObj->sysInfo,
9,639,716✔
372
      .svrVer = pTscObj->sVer,
9,642,580✔
373
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
9,642,580✔
374
      .stmtBindVersion = pRequest->stmtBindVersion,
9,643,933✔
375
      .setQueryFp = setQueryRequest,
376
      .timezone = pTscObj->optionInfo.timezone,
9,639,768✔
377
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
9,638,610✔
378
  };
379

380
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
9,638,063✔
381
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
9,643,413✔
382
  if (code != TSDB_CODE_SUCCESS) {
9,642,580!
383
    return code;
×
384
  }
385

386
  code = qParseSql(&cxt, pQuery);
9,642,580✔
387
  if (TSDB_CODE_SUCCESS == code) {
9,632,300✔
388
    if ((*pQuery)->haveResultSet) {
9,629,577!
389
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
390
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
391
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
392
    }
393
  }
394

395
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
9,635,083!
396
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
9,629,699✔
397
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
9,625,179✔
398
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
9,622,459✔
399
  }
400

401
  taosArrayDestroy(cxt.pTableMetaPos);
9,622,776✔
402
  taosArrayDestroy(cxt.pTableVgroupPos);
9,622,638✔
403

404
  return code;
9,633,275✔
405
}
406

407
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
408
  SRetrieveTableRsp* pRsp = NULL;
×
409
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
410
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
411
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
412
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
413
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
414
                                 pRequest->stmtBindVersion > 0);
×
415
  }
416

417
  return code;
×
418
}
419

420
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
1,239,589✔
421
  // drop table if exists not_exists_table
422
  if (NULL == pQuery->pCmdMsg) {
1,239,589!
423
    return TSDB_CODE_SUCCESS;
×
424
  }
425

426
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
1,239,589✔
427
  pRequest->type = pMsgInfo->msgType;
1,239,589✔
428
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
1,239,589✔
429
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
1,238,414✔
430

431
  STscObj*      pTscObj = pRequest->pTscObj;
1,239,589✔
432
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
1,239,589✔
433

434
  // int64_t transporterId = 0;
435
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
1,239,589!
436
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
1,239,589!
437
  return TSDB_CODE_SUCCESS;
1,239,589✔
438
}
439

440
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
2,147,483,647✔
441

442
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
19,880,398✔
443
  SRetrieveTableRsp* pRsp = NULL;
19,880,398✔
444
  if (pRequest->validateOnly) {
19,880,398!
445
    doRequestCallback(pRequest, 0);
37,260✔
446
    return;
37,260✔
447
  }
448

449
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
39,626,034✔
450
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
39,626,034✔
451
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
19,843,138✔
452
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
12,621,241!
453
                                 pRequest->stmtBindVersion > 0);
12,621,241✔
454
  }
455

456
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
19,843,138✔
457
  pRequest->code = code;
19,843,138✔
458

459
  if (pRequest->code != TSDB_CODE_SUCCESS) {
19,843,138✔
460
    pResultInfo->numOfRows = 0;
12,844✔
461
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
12,844!
462
             pRequest->requestId);
463
  } else {
464
    tscDebug(
19,830,294!
465
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
466
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
467
  }
468

469
  doRequestCallback(pRequest, code);
19,843,138✔
470
}
471

472
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
45,459,611✔
473
  if (pRequest->validateOnly) {
45,459,611!
474
    doRequestCallback(pRequest, 0);
×
475
    return TSDB_CODE_SUCCESS;
×
476
  }
477

478
  // drop table if exists not_exists_table
479
  if (NULL == pQuery->pCmdMsg) {
45,459,611✔
480
    doRequestCallback(pRequest, 0);
20,501✔
481
    return TSDB_CODE_SUCCESS;
20,501✔
482
  }
483

484
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
45,437,921✔
485
  pRequest->type = pMsgInfo->msgType;
45,439,566✔
486
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
45,439,082✔
487
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
45,440,633✔
488

489
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
45,439,928✔
490
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
45,440,982✔
491

492
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
45,442,049✔
493
  if (code) {
45,442,754!
494
    doRequestCallback(pRequest, code);
×
495
  }
496
  return code;
45,442,754✔
497
}
498

499
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
701,076✔
500
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
701,076✔
501
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
701,076✔
502

503
  if (node1->load < node2->load) {
701,076!
504
    return -1;
×
505
  }
506

507
  return node1->load > node2->load;
701,076✔
508
}
509

510
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
18,261,387✔
511
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
18,261,387!
512
  if (pInfo->pQnodeList) {
18,261,387✔
513
    taosArrayDestroy(pInfo->pQnodeList);
17,990,089✔
514
    pInfo->pQnodeList = NULL;
17,990,089✔
515
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
17,990,089✔
516
  }
517

518
  if (pNodeList) {
18,261,387!
519
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
18,261,387✔
520
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
18,261,387✔
521
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
18,261,387✔
522
             taosArrayGetSize(pInfo->pQnodeList));
523
  }
524
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
18,261,387!
525

526
  return TSDB_CODE_SUCCESS;
18,261,387✔
527
}
528

529
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
2,147,483,647✔
530
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
2,147,483,647✔
531
    *required = false;
2,147,483,647✔
532
    return TSDB_CODE_SUCCESS;
2,147,483,647✔
533
  }
534

535
  int32_t       code = TSDB_CODE_SUCCESS;
221,848,262✔
536
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
221,848,262✔
537
  *required = false;
221,848,972✔
538

539
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
221,847,551!
540
  *required = (NULL == pInfo->pQnodeList);
221,850,394✔
541
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
221,850,394!
542
  return TSDB_CODE_SUCCESS;
221,848,261✔
543
}
544

545
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
546
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
547
  int32_t       code = 0;
×
548

549
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
550
  if (pInfo->pQnodeList) {
×
551
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
552
  }
553
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
554
  if (NULL == *pNodeList) {
×
555
    SCatalog* pCatalog = NULL;
×
556
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
557
    if (TSDB_CODE_SUCCESS == code) {
×
558
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
559
      if (NULL == pNodeList) {
×
560
        TSC_ERR_RET(terrno);
×
561
      }
562
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
563
                               .requestId = pRequest->requestId,
×
564
                               .requestObjRefId = pRequest->self,
×
565
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
566
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
567
    }
568

569
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
570
      code = updateQnodeList(pInfo, *pNodeList);
×
571
    }
572
  }
573

574
  return code;
×
575
}
576

577
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
23,650,224✔
578
  pRequest->type = pQuery->msgType;
23,650,224✔
579
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
23,652,241✔
580

581
  SPlanContext cxt = {.queryId = pRequest->requestId,
25,020,744✔
582
                      .acctId = pRequest->pTscObj->acctId,
23,645,698✔
583
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
23,637,950✔
584
                      .pAstRoot = pQuery->pRoot,
23,653,358✔
585
                      .showRewrite = pQuery->showRewrite,
23,654,191!
586
                      .pMsg = pRequest->msgBuf,
23,652,174✔
587
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
588
                      .pUser = pRequest->pTscObj->user,
23,650,503✔
589
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
23,648,360✔
590
                      .sysInfo = pRequest->pTscObj->sysInfo};
23,651,357✔
591

592
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
23,650,859✔
593
}
594

595
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
788,070,952✔
596
                         const SExtSchema* pExtSchema, bool isStmt) {
597
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
788,070,952!
598
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
599
    return TSDB_CODE_INVALID_PARA;
×
600
  }
601

602
  pResInfo->numOfCols = numOfCols;
788,087,509✔
603
  if (pResInfo->fields != NULL) {
788,084,134✔
604
    taosMemoryFree(pResInfo->fields);
36,897!
605
  }
606
  if (pResInfo->userFields != NULL) {
788,069,934✔
607
    taosMemoryFree(pResInfo->userFields);
36,897!
608
  }
609
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
788,068,542!
610
  if (NULL == pResInfo->fields) return terrno;
788,043,903!
611
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
788,042,781!
612
  if (NULL == pResInfo->userFields) {
788,060,851!
613
    taosMemoryFree(pResInfo->fields);
×
614
    return terrno;
×
615
  }
616
  if (numOfCols != pResInfo->numOfCols) {
788,061,514!
617
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
618
    return TSDB_CODE_FAILED;
×
619
  }
620

621
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
2,147,483,647✔
622
    pResInfo->fields[i].type = pSchema[i].type;
2,147,483,647✔
623

624
    pResInfo->userFields[i].type = pSchema[i].type;
2,147,483,647✔
625
    // userFields must convert to type bytes, no matter isStmt or not
626
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
2,147,483,647✔
627
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
2,147,483,647✔
628
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
2,147,483,647!
629
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
2,088,105✔
630
    }
631

632
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
2,147,483,647!
633
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
2,147,483,647!
634
  }
635
  return TSDB_CODE_SUCCESS;
788,092,840✔
636
}
637

638
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
515,416,643✔
639
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
515,416,643!
640
      precision != TSDB_TIME_PRECISION_NANO) {
641
    return;
×
642
  }
643

644
  pResInfo->precision = precision;
515,416,643✔
645
}
646

647
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
432,217,052✔
648
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
432,217,052✔
649
  if (NULL == nodeList) {
432,252,980✔
650
    return terrno;
17✔
651
  }
652
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
432,254,806✔
653

654
  int32_t dbNum = taosArrayGetSize(pDbVgList);
432,254,806✔
655
  for (int32_t i = 0; i < dbNum; ++i) {
856,845,631✔
656
    SArray* pVg = taosArrayGetP(pDbVgList, i);
424,554,195✔
657
    if (NULL == pVg) {
424,566,641!
658
      continue;
×
659
    }
660
    int32_t vgNum = taosArrayGetSize(pVg);
424,566,641✔
661
    if (vgNum <= 0) {
424,565,787✔
662
      continue;
1,045,163✔
663
    }
664

665
    for (int32_t j = 0; j < vgNum; ++j) {
1,408,258,436✔
666
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
984,726,141✔
667
      if (NULL == pInfo) {
984,751,108!
668
        taosArrayDestroy(nodeList);
×
669
        return TSDB_CODE_OUT_OF_RANGE;
×
670
      }
671
      SQueryNodeLoad load = {0};
984,751,108✔
672
      load.addr.nodeId = pInfo->vgId;
984,731,480✔
673
      load.addr.epSet = pInfo->epSet;
984,750,923✔
674

675
      if (NULL == taosArrayPush(nodeList, &load)) {
984,658,126!
676
        taosArrayDestroy(nodeList);
×
677
        return terrno;
×
678
      }
679
    }
680
  }
681

682
  int32_t vnodeNum = taosArrayGetSize(nodeList);
432,291,436✔
683
  if (vnodeNum > 0) {
432,281,559✔
684
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
422,477,294✔
685
    goto _return;
422,476,033✔
686
  }
687

688
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
9,804,265✔
689
  if (mnodeNum <= 0) {
9,795,130!
690
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
691
    goto _return;
×
692
  }
693

694
  void* pData = taosArrayGet(pMnodeList, 0);
9,795,130✔
695
  if (NULL == pData) {
9,795,130!
696
    taosArrayDestroy(nodeList);
×
697
    return TSDB_CODE_OUT_OF_RANGE;
×
698
  }
699
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
9,795,130!
700
    taosArrayDestroy(nodeList);
×
701
    return terrno;
×
702
  }
703

704
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
9,795,130✔
705

706
_return:
230,567✔
707

708
  *pNodeList = nodeList;
432,270,858✔
709

710
  return TSDB_CODE_SUCCESS;
432,254,293✔
711
}
712

713
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
97,288,410✔
714
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
97,288,410✔
715
  if (NULL == nodeList) {
97,288,410!
716
    return terrno;
×
717
  }
718

719
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
97,288,410✔
720
  if (qNodeNum > 0) {
97,288,410✔
721
    void* pData = taosArrayGet(pQnodeList, 0);
96,912,960✔
722
    if (NULL == pData) {
96,912,960!
723
      taosArrayDestroy(nodeList);
×
724
      return TSDB_CODE_OUT_OF_RANGE;
×
725
    }
726
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
96,912,960!
727
      taosArrayDestroy(nodeList);
×
728
      return terrno;
×
729
    }
730
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
96,912,960✔
731
    goto _return;
96,912,960✔
732
  }
733

734
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
375,450✔
735
  if (mnodeNum <= 0) {
375,450!
736
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
×
737
    goto _return;
×
738
  }
739

740
  void* pData = taosArrayGet(pMnodeList, 0);
375,450✔
741
  if (NULL == pData) {
375,450!
742
    taosArrayDestroy(nodeList);
×
743
    return TSDB_CODE_OUT_OF_RANGE;
×
744
  }
745
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
375,450!
746
    taosArrayDestroy(nodeList);
×
747
    return terrno;
×
748
  }
749

750
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
375,450!
751

752
_return:
×
753

754
  *pNodeList = nodeList;
97,288,410✔
755

756
  return TSDB_CODE_SUCCESS;
97,288,410✔
757
}
758

759
void freeVgList(void* list) {
23,424,212✔
760
  SArray* pList = *(SArray**)list;
23,424,212✔
761
  taosArrayDestroy(pList);
23,434,438✔
762
}
23,426,424✔
763

764
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
505,897,679✔
765
  SArray* pDbVgList = NULL;
505,897,679✔
766
  SArray* pQnodeList = NULL;
505,897,679✔
767
  FDelete fp = NULL;
505,897,679✔
768
  int32_t code = 0;
505,897,679✔
769

770
  switch (tsQueryPolicy) {
505,897,679✔
771
    case QUERY_POLICY_VNODE:
408,609,268✔
772
    case QUERY_POLICY_CLIENT: {
773
      if (pResultMeta) {
408,609,268✔
774
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
408,616,315✔
775
        if (NULL == pDbVgList) {
408,610,441!
776
          code = terrno;
×
777
          goto _return;
×
778
        }
779
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
408,610,441✔
780
        for (int32_t i = 0; i < dbNum; ++i) {
809,734,054✔
781
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
401,111,473✔
782
          if (pRes->code || NULL == pRes->pRes) {
401,111,106!
783
            continue;
×
784
          }
785

786
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
802,225,661!
787
            code = terrno;
×
788
            goto _return;
×
789
          }
790
        }
791
      } else {
792
        fp = freeVgList;
132✔
793

794
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
132✔
795
        if (dbNum > 0) {
132!
796
          SCatalog*     pCtg = NULL;
132✔
797
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
132✔
798
          code = catalogGetHandle(pInst->clusterId, &pCtg);
132✔
799
          if (code != TSDB_CODE_SUCCESS) {
132!
800
            goto _return;
×
801
          }
802

803
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
132✔
804
          if (NULL == pDbVgList) {
132!
805
            code = terrno;
×
806
            goto _return;
×
807
          }
808
          SArray* pVgList = NULL;
132✔
809
          for (int32_t i = 0; i < dbNum; ++i) {
264✔
810
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
132✔
811
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
132✔
812
                                     .requestId = pRequest->requestId,
132✔
813
                                     .requestObjRefId = pRequest->self,
132✔
814
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
132✔
815

816
            // catalogGetDBVgList will handle dbFName == null.
817
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
132✔
818
            if (code) {
132!
819
              goto _return;
×
820
            }
821

822
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
132!
823
              code = terrno;
×
824
              goto _return;
×
825
            }
826
          }
827
        }
828
      }
829

830
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
408,622,713✔
831
      break;
408,620,539✔
832
    }
833
    case QUERY_POLICY_HYBRID:
97,287,737✔
834
    case QUERY_POLICY_QNODE: {
835
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
97,784,975!
836
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
497,238✔
837
        if (pRes->code) {
497,238!
838
          pQnodeList = NULL;
×
839
        } else {
840
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
497,238✔
841
          if (NULL == pQnodeList) {
497,238!
842
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
843
            goto _return;
×
844
          }
845
        }
846
      } else {
847
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
96,790,499✔
848
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
96,791,172!
849
        if (pInst->pQnodeList) {
96,791,172!
850
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
96,791,172✔
851
          if (NULL == pQnodeList) {
96,791,172!
852
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
853
            goto _return;
×
854
          }
855
        }
856
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
96,791,172!
857
      }
858

859
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
97,288,410✔
860
      break;
97,288,410✔
861
    }
862
    default:
1,204✔
863
      tscError("unknown query policy: %d", tsQueryPolicy);
1,204!
864
      return TSDB_CODE_APP_ERROR;
×
865
  }
866

867
_return:
505,908,949✔
868
  taosArrayDestroyEx(pDbVgList, fp);
505,908,949✔
869
  taosArrayDestroy(pQnodeList);
505,900,623✔
870

871
  return code;
505,902,944✔
872
}
873

874
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
23,608,666✔
875
  SArray* pDbVgList = NULL;
23,608,666✔
876
  SArray* pQnodeList = NULL;
23,608,666✔
877
  int32_t code = 0;
23,614,376✔
878

879
  switch (tsQueryPolicy) {
23,614,376!
880
    case QUERY_POLICY_VNODE:
23,607,789✔
881
    case QUERY_POLICY_CLIENT: {
882
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
23,607,789✔
883
      if (dbNum > 0) {
23,647,490✔
884
        SCatalog*     pCtg = NULL;
23,440,867✔
885
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
23,437,535✔
886
        code = catalogGetHandle(pInst->clusterId, &pCtg);
23,437,346✔
887
        if (code != TSDB_CODE_SUCCESS) {
23,398,829!
888
          goto _return;
×
889
        }
890

891
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
23,398,829✔
892
        if (NULL == pDbVgList) {
23,418,633!
893
          code = terrno;
×
894
          goto _return;
×
895
        }
896
        SArray* pVgList = NULL;
23,418,633✔
897
        for (int32_t i = 0; i < dbNum; ++i) {
46,831,004✔
898
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
23,412,875✔
899
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
23,441,521✔
900
                                   .requestId = pRequest->requestId,
23,428,214✔
901
                                   .requestObjRefId = pRequest->self,
23,414,894✔
902
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
23,426,544✔
903

904
          // catalogGetDBVgList will handle dbFName == null.
905
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
23,446,777✔
906
          if (code) {
23,427,521!
907
            goto _return;
×
908
          }
909

910
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
23,439,486!
911
            code = terrno;
×
912
            goto _return;
×
913
          }
914
        }
915
      }
916

917
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
23,652,735✔
918
      break;
23,637,839✔
919
    }
920
    case QUERY_POLICY_HYBRID:
×
921
    case QUERY_POLICY_QNODE: {
922
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
923

924
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
925
      break;
×
926
    }
927
    default:
6,992✔
928
      tscError("unknown query policy: %d", tsQueryPolicy);
6,992!
929
      return TSDB_CODE_APP_ERROR;
×
930
  }
931

932
_return:
23,630,944✔
933

934
  taosArrayDestroyEx(pDbVgList, freeVgList);
23,636,673✔
935
  taosArrayDestroy(pQnodeList);
23,625,570✔
936

937
  return code;
23,638,220✔
938
}
939

940
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
23,620,439✔
941
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
23,620,439✔
942

943
  SExecResult      res = {0};
23,647,664✔
944
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
23,653,090✔
945
                           .requestId = pRequest->requestId,
23,642,578✔
946
                           .requestObjRefId = pRequest->self};
23,631,395✔
947
  SSchedulerReq    req = {
25,018,536✔
948
         .syncReq = true,
949
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
23,638,420✔
950
         .pConn = &conn,
951
         .pNodeList = pNodeList,
952
         .pDag = pDag,
953
         .sql = pRequest->sqlstr,
23,638,420✔
954
         .startTs = pRequest->metric.start,
23,639,723✔
955
         .execFp = NULL,
956
         .cbParam = NULL,
957
         .chkKillFp = chkRequestKilled,
958
         .chkKillParam = (void*)pRequest->self,
23,638,546✔
959
         .pExecRes = &res,
960
         .source = pRequest->source,
23,634,656✔
961
         .pWorkerCb = getTaskPoolWorkerCb(),
23,644,044✔
962
  };
963

964
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
23,633,641✔
965

966
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
23,655,770✔
967
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
23,658,119!
968

969
  if (code != TSDB_CODE_SUCCESS) {
23,650,601!
970
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
971

972
    pRequest->code = code;
×
973
    terrno = code;
×
974
    return pRequest->code;
×
975
  }
976

977
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
23,650,601!
978
      TDMT_VND_CREATE_TABLE == pRequest->type) {
152,520✔
979
    pRequest->body.resInfo.numOfRows = res.numOfRows;
23,604,289✔
980
    if (TDMT_VND_SUBMIT == pRequest->type) {
23,605,468✔
981
      STscObj*            pTscObj = pRequest->pTscObj;
23,502,022✔
982
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
23,500,005✔
983
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
23,506,339✔
984
    }
985

986
    schedulerFreeJob(&pRequest->body.queryJob, 0);
23,609,231✔
987
  }
988

989
  pRequest->code = res.code;
23,654,571✔
990
  terrno = res.code;
23,652,575✔
991
  return pRequest->code;
23,645,761✔
992
}
993

994
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
1,718,198,319✔
995
  SArray*      pArray = NULL;
1,718,198,319✔
996
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
1,718,198,319✔
997
  if (NULL == pRsp->aCreateTbRsp) {
1,718,198,319✔
998
    return TSDB_CODE_SUCCESS;
1,663,816,223✔
999
  }
1000

1001
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
54,425,972✔
1002
  for (int32_t i = 0; i < tbNum; ++i) {
125,610,349✔
1003
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
71,182,529✔
1004
    if (pTbRsp->pMeta) {
71,182,170✔
1005
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
68,051,509!
1006
    }
1007
  }
1008

1009
  return TSDB_CODE_SUCCESS;
54,427,820✔
1010
}
1011

1012
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
415,734,925✔
1013
  int32_t code = 0;
415,734,925✔
1014
  SArray* pArray = NULL;
415,734,925✔
1015
  SArray* pTbArray = (SArray*)res;
415,734,925✔
1016
  int32_t tbNum = taosArrayGetSize(pTbArray);
415,734,925✔
1017
  if (tbNum <= 0) {
415,729,998!
1018
    return TSDB_CODE_SUCCESS;
×
1019
  }
1020

1021
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
415,729,998✔
1022
  if (NULL == pArray) {
415,731,869!
1023
    return terrno;
×
1024
  }
1025

1026
  for (int32_t i = 0; i < tbNum; ++i) {
1,258,698,776✔
1027
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
842,972,181✔
1028
    if (NULL == tbInfo) {
842,969,862!
1029
      code = terrno;
×
1030
      goto _return;
×
1031
    }
1032
    STbSVersion tbSver = {
842,969,862✔
1033
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
842,974,473✔
1034
    if (NULL == taosArrayPush(pArray, &tbSver)) {
842,967,844!
1035
      code = terrno;
×
1036
      goto _return;
×
1037
    }
1038
  }
1039

1040
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
415,726,595✔
1041
                           .requestId = pRequest->requestId,
415,729,305✔
1042
                           .requestObjRefId = pRequest->self,
415,729,971✔
1043
                           .mgmtEps = *epset};
1044

1045
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
415,729,307✔
1046

1047
_return:
415,735,143✔
1048

1049
  taosArrayDestroy(pArray);
415,724,434✔
1050
  return code;
415,730,102✔
1051
}
1052

1053
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
23,908,053✔
1054
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
23,908,053✔
1055
}
1056

1057
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
193,716,506✔
1058
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
193,716,506✔
1059
}
1060

1061
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
2,147,483,647✔
1062
  if (NULL == pRequest->body.resInfo.execRes.res) {
2,147,483,647✔
1063
    return pRequest->code;
133,558,583✔
1064
  }
1065

1066
  SCatalog*     pCatalog = NULL;
2,147,483,647✔
1067
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
2,147,483,647✔
1068

1069
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
2,147,483,647✔
1070
  if (code) {
2,147,483,647!
1071
    return code;
×
1072
  }
1073

1074
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
2,147,483,647✔
1075
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
2,147,483,647✔
1076

1077
  switch (pRes->msgType) {
2,147,483,647✔
1078
    case TDMT_VND_ALTER_TABLE:
15,140,728✔
1079
    case TDMT_MND_ALTER_STB: {
1080
      code = handleAlterTbExecRes(pRes->res, pCatalog);
15,140,728✔
1081
      break;
15,140,728✔
1082
    }
1083
    case TDMT_VND_CREATE_TABLE: {
97,604,072✔
1084
      SArray* pList = (SArray*)pRes->res;
97,604,072✔
1085
      int32_t num = taosArrayGetSize(pList);
97,624,460✔
1086
      for (int32_t i = 0; i < num; ++i) {
216,393,077✔
1087
        void* res = taosArrayGetP(pList, i);
118,739,457✔
1088
        // handleCreateTbExecRes will handle res == null
1089
        code = handleCreateTbExecRes(res, pCatalog);
118,740,506✔
1090
      }
1091
      break;
97,653,620✔
1092
    }
1093
    case TDMT_MND_CREATE_STB: {
1,089,914✔
1094
      code = handleCreateTbExecRes(pRes->res, pCatalog);
1,089,914✔
1095
      break;
1,089,914✔
1096
    }
1097
    case TDMT_VND_SUBMIT: {
1,718,207,538✔
1098
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
1,718,207,538✔
1099

1100
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
1,718,246,207✔
1101
      break;
1,718,221,539✔
1102
    }
1103
    case TDMT_SCH_QUERY:
415,730,779✔
1104
    case TDMT_SCH_MERGE_QUERY: {
1105
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
415,730,779✔
1106
      break;
415,703,958✔
1107
    }
1108
    default:
445✔
1109
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
445!
1110
               pRequest->type, pRequest->requestId);
1111
      code = TSDB_CODE_APP_ERROR;
×
1112
  }
1113

1114
  return code;
2,147,483,647✔
1115
}
1116

1117
static bool incompletaFileParsing(SNode* pStmt) {
2,147,483,647✔
1118
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
2,147,483,647!
1119
}
1120

1121
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1122
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1123

1124
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1125
  if (TSDB_CODE_SUCCESS == code) {
×
1126
    int64_t analyseStart = taosGetTimestampUs();
×
1127
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1128
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1129
  }
1130

1131
  if (TSDB_CODE_SUCCESS == code) {
×
1132
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1133
  }
1134

1135
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1136
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1137
}
×
1138

1139
void returnToUser(SRequestObj* pRequest) {
52,063,641✔
1140
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
52,063,641!
1141
    // return to client
1142
    doRequestCallback(pRequest, pRequest->code);
52,063,641✔
1143
    return;
52,063,641✔
1144
  }
1145

1146
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1147
  if (pUserReq) {
×
1148
    pUserReq->code = pRequest->code;
×
1149
    // return to client
1150
    doRequestCallback(pUserReq, pUserReq->code);
×
1151
    (void)releaseRequest(pRequest->relation.userRefId);
×
1152
    return;
×
1153
  } else {
1154
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1155
             pRequest->relation.userRefId, pRequest->requestId);
1156
  }
1157
}
1158

1159
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1160
  int64_t     lastTs = 0;
×
1161
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1162
  int32_t     numOfFields = taos_num_fields(pRes);
×
1163

1164
  int32_t code = createDataBlock(pBlock);
×
1165
  if (code) {
×
1166
    return code;
×
1167
  }
1168

1169
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1170
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1171
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1172
    if (TSDB_CODE_SUCCESS != code) {
×
1173
      blockDataDestroy(*pBlock);
×
1174
      return code;
×
1175
    }
1176
  }
1177

1178
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1179
  if (TSDB_CODE_SUCCESS != code) {
×
1180
    blockDataDestroy(*pBlock);
×
1181
    return code;
×
1182
  }
1183

1184
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1185
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1186
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1187
      tscError("invalid data from vnode");
×
1188
      blockDataDestroy(*pBlock);
×
1189
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1190
    }
1191
    int64_t ts = *(int64_t*)pRow[0];
×
1192
    if (lastTs < ts) {
×
1193
      lastTs = ts;
×
1194
    }
1195

1196
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1197
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1198
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1199
      if (TSDB_CODE_SUCCESS != code) {
×
1200
        blockDataDestroy(*pBlock);
×
1201
        return code;
×
1202
      }
1203
    }
1204

1205
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1206
            *(int64_t*)pRow[2]);
1207
  }
1208

1209
  (*pBlock)->info.window.ekey = lastTs;
×
1210
  (*pBlock)->info.rows = numOfRows;
×
1211

1212
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1213
  return TSDB_CODE_SUCCESS;
×
1214
}
1215

1216
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1217
  SRequestObj* pRequest = (SRequestObj*)res;
×
1218
  if (pRequest->code) {
×
1219
    returnToUser(pRequest);
×
1220
    return;
×
1221
  }
1222

1223
  SSDataBlock* pBlock = NULL;
×
1224
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1225
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1226
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1227
             tstrerror(pRequest->code));
1228
    returnToUser(pRequest);
×
1229
    return;
×
1230
  }
1231

1232
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1233
  if (pNextReq) {
×
1234
    continuePostSubQuery(pNextReq, pBlock);
×
1235
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1236
  } else {
1237
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1238
             pRequest->relation.nextRefId, pRequest->requestId);
1239
  }
1240

1241
  blockDataDestroy(pBlock);
×
1242
}
1243

1244
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1245
  SRequestObj* pRequest = pWrapper->pRequest;
×
1246
  if (TD_RES_QUERY(pRequest)) {
×
1247
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1248
    return;
×
1249
  }
1250

1251
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1252
  if (pNextReq) {
×
1253
    continuePostSubQuery(pNextReq, NULL);
×
1254
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1255
  } else {
1256
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1257
             pRequest->relation.nextRefId, pRequest->requestId);
1258
  }
1259
}
1260

1261
// todo refacto the error code  mgmt
1262
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
2,147,483,647✔
1263
  SSqlCallbackWrapper* pWrapper = param;
2,147,483,647✔
1264
  SRequestObj*         pRequest = pWrapper->pRequest;
2,147,483,647✔
1265
  STscObj*             pTscObj = pRequest->pTscObj;
2,147,483,647✔
1266

1267
  pRequest->code = code;
2,147,483,647✔
1268
  if (pResult) {
2,147,483,647✔
1269
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
2,147,483,647✔
1270
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
2,147,483,647!
1271
  }
1272

1273
  int32_t type = pRequest->type;
2,147,483,647✔
1274
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
2,147,483,647✔
1275
    if (pResult) {
1,806,190,179✔
1276
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
1,806,145,268✔
1277

1278
      // record the insert rows
1279
      if (TDMT_VND_SUBMIT == type) {
1,806,149,031✔
1280
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,695,137,083✔
1281
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
1,695,151,474✔
1282
      }
1283
    }
1284
    schedulerFreeJob(&pRequest->body.queryJob, 0);
1,806,224,018✔
1285
  }
1286

1287
  taosMemoryFree(pResult);
2,147,483,647!
1288
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
2,147,483,647✔
1289
           pRequest->requestId);
1290

1291
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
2,147,483,647!
1292
      pRequest->stmtBindVersion == 0) {
207,123✔
1293
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
207,090✔
1294
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1295
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
207,090!
1296
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1297
    }
1298
    restartAsyncQuery(pRequest, code);
207,090✔
1299
    return;
207,090✔
1300
  }
1301

1302
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
2,147,483,647!
1303
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
2,147,483,647!
1304
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
28,255,794!
1305
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1306
    }
1307
  }
1308

1309
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
2,147,483,647✔
1310
  int32_t code1 = handleQueryExecRsp(pRequest);
2,147,483,647✔
1311
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
2,147,483,647!
1312
    pRequest->code = code1;
×
1313
  }
1314

1315
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
2,147,483,647✔
1316
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
2,147,483,647✔
1317
    continueInsertFromCsv(pWrapper, pRequest);
30,802✔
1318
    return;
30,802✔
1319
  }
1320

1321
  if (pRequest->relation.nextRefId) {
2,147,483,647!
1322
    handlePostSubQuery(pWrapper);
×
1323
  } else {
1324
    destorySqlCallbackWrapper(pWrapper);
2,147,483,647✔
1325
    pRequest->pWrapper = NULL;
2,147,483,647✔
1326

1327
    // return to client
1328
    doRequestCallback(pRequest, code);
2,147,483,647✔
1329
  }
1330
}
1331

1332
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
24,865,345✔
1333
  int32_t code = 0;
24,865,345✔
1334
  int32_t subplanNum = 0;
24,865,345✔
1335

1336
  if (pQuery->pRoot) {
24,865,345✔
1337
    pRequest->stmtType = pQuery->pRoot->type;
23,649,528✔
1338
  }
1339

1340
  if (pQuery->pRoot && !pRequest->inRetry) {
24,873,558!
1341
    STscObj*            pTscObj = pRequest->pTscObj;
23,652,222✔
1342
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
23,648,660✔
1343
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
23,653,120✔
1344
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
23,623,922✔
1345
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
27,843!
1346
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
27,843✔
1347
    }
1348
  }
1349

1350
  pRequest->body.execMode = pQuery->execMode;
24,882,930✔
1351
  switch (pQuery->execMode) {
24,895,851!
1352
    case QUERY_EXEC_MODE_LOCAL:
×
1353
      if (!pRequest->validateOnly) {
×
1354
        if (NULL == pQuery->pRoot) {
×
1355
          terrno = TSDB_CODE_INVALID_PARA;
×
1356
          code = terrno;
×
1357
        } else {
1358
          code = execLocalCmd(pRequest, pQuery);
×
1359
        }
1360
      }
1361
      break;
×
1362
    case QUERY_EXEC_MODE_RPC:
1,239,589✔
1363
      if (!pRequest->validateOnly) {
1,239,589!
1364
        code = execDdlQuery(pRequest, pQuery);
1,239,589✔
1365
      }
1366
      break;
1,239,589✔
1367
    case QUERY_EXEC_MODE_SCHEDULE: {
23,634,315✔
1368
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
23,634,315✔
1369
      if (NULL == pMnodeList) {
23,638,659!
1370
        code = terrno;
×
1371
        break;
×
1372
      }
1373
      SQueryPlan* pDag = NULL;
23,638,659✔
1374
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
23,638,659✔
1375
      if (TSDB_CODE_SUCCESS == code) {
23,620,706✔
1376
        pRequest->body.subplanNum = pDag->numOfSubplans;
23,617,369✔
1377
        if (!pRequest->validateOnly) {
23,644,679!
1378
          SArray* pNodeList = NULL;
23,634,150✔
1379
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
23,630,802✔
1380
          if (TSDB_CODE_SUCCESS == code) {
23,625,112!
1381
            code = scheduleQuery(pRequest, pDag, pNodeList);
23,630,822✔
1382
          }
1383
          taosArrayDestroy(pNodeList);
23,634,434✔
1384
        }
1385
      }
1386
      taosArrayDestroy(pMnodeList);
23,650,407✔
1387
      break;
23,654,759✔
1388
    }
1389
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1390
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1391
      break;
×
1392
    default:
×
1393
      break;
×
1394
  }
1395

1396
  if (!keepQuery) {
24,895,516!
1397
    qDestroyQuery(pQuery);
×
1398
  }
1399

1400
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
24,895,516!
1401
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
152,202!
1402
    if (TSDB_CODE_SUCCESS != ret) {
152,202!
1403
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1404
               pRequest->requestId);
1405
    }
1406
  }
1407

1408
  if (TSDB_CODE_SUCCESS == code) {
24,881,422✔
1409
    code = handleQueryExecRsp(pRequest);
24,878,761✔
1410
  }
1411

1412
  if (TSDB_CODE_SUCCESS != code) {
24,880,728✔
1413
    pRequest->code = code;
80,185✔
1414
  }
1415

1416
  if (res) {
24,880,728!
1417
    *res = pRequest->body.resInfo.execRes.res;
×
1418
    pRequest->body.resInfo.execRes.res = NULL;
×
1419
  }
1420
}
24,880,728✔
1421

1422
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
2,147,483,647✔
1423
                                 SSqlCallbackWrapper* pWrapper) {
1424
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
1425
  pRequest->type = pQuery->msgType;
2,147,483,647✔
1426
  SArray*     pMnodeList = NULL;
2,147,483,647✔
1427
  SQueryPlan* pDag = NULL;
2,147,483,647✔
1428
  int64_t     st = taosGetTimestampUs();
2,147,483,647✔
1429

1430
  if (!pRequest->parseOnly) {
2,147,483,647✔
1431
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
2,147,483,647✔
1432
    if (NULL == pMnodeList) {
2,147,483,647!
1433
      code = terrno;
×
1434
    }
1435
    SPlanContext cxt = {.queryId = pRequest->requestId,
2,147,483,647✔
1436
                        .acctId = pRequest->pTscObj->acctId,
2,147,483,647✔
1437
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
2,147,483,647✔
1438
                        .pAstRoot = pQuery->pRoot,
2,147,483,647✔
1439
                        .showRewrite = pQuery->showRewrite,
2,147,483,647!
1440
                        .isView = pWrapper->pParseCtx->isView,
2,147,483,647!
1441
                        .isAudit = pWrapper->pParseCtx->isAudit,
2,147,483,647!
1442
                        .pMsg = pRequest->msgBuf,
2,147,483,647✔
1443
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1444
                        .pUser = pRequest->pTscObj->user,
2,147,483,647✔
1445
                        .sysInfo = pRequest->pTscObj->sysInfo,
2,147,483,647✔
1446
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
2,147,483,647✔
1447
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
2,147,483,647✔
1448
    if (TSDB_CODE_SUCCESS == code) {
2,147,483,647✔
1449
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
2,147,483,647✔
1450
    }
1451
    if (code) {
2,147,483,647✔
1452
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
877,140!
1453
               pRequest->requestId);
1454
    } else {
1455
      pRequest->body.subplanNum = pDag->numOfSubplans;
2,147,483,647✔
1456
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
2,147,483,647✔
1457
    }
1458
  }
1459

1460
  pRequest->metric.execStart = taosGetTimestampUs();
2,147,483,647✔
1461
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
2,147,483,647✔
1462

1463
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
2,147,483,647✔
1464
    SArray* pNodeList = NULL;
2,147,483,647✔
1465
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
2,147,483,647✔
1466
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
505,892,219✔
1467
    }
1468

1469
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
2,147,483,647✔
1470
                             .requestId = pRequest->requestId,
2,147,483,647✔
1471
                             .requestObjRefId = pRequest->self};
2,147,483,647✔
1472
    SSchedulerReq    req = {
2,147,483,647✔
1473
           .syncReq = false,
1474
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
2,147,483,647✔
1475
           .pConn = &conn,
1476
           .pNodeList = pNodeList,
1477
           .pDag = pDag,
1478
           .allocatorRefId = pRequest->allocatorRefId,
2,147,483,647✔
1479
           .sql = pRequest->sqlstr,
2,147,483,647✔
1480
           .startTs = pRequest->metric.start,
2,147,483,647✔
1481
           .execFp = schedulerExecCb,
1482
           .cbParam = pWrapper,
1483
           .chkKillFp = chkRequestKilled,
1484
           .chkKillParam = (void*)pRequest->self,
2,147,483,647✔
1485
           .pExecRes = NULL,
1486
           .source = pRequest->source,
2,147,483,647✔
1487
           .pWorkerCb = getTaskPoolWorkerCb(),
2,147,483,647✔
1488
    };
1489
    if (TSDB_CODE_SUCCESS == code) {
2,147,483,647!
1490
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
2,147,483,647✔
1491
    }
1492

1493
    taosArrayDestroy(pNodeList);
2,147,483,647✔
1494
  } else {
1495
    qDestroyQueryPlan(pDag);
1,427,628✔
1496
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
1,453,469!
1497
             pRequest->requestId);
1498
    destorySqlCallbackWrapper(pWrapper);
1,453,469✔
1499
    pRequest->pWrapper = NULL;
1,453,469✔
1500
    if (TSDB_CODE_SUCCESS != code) {
1,453,469✔
1501
      pRequest->code = terrno;
877,140✔
1502
    }
1503

1504
    doRequestCallback(pRequest, code);
1,453,469✔
1505
  }
1506

1507
  // todo not to be released here
1508
  taosArrayDestroy(pMnodeList);
2,147,483,647✔
1509

1510
  return code;
2,147,483,647✔
1511
}
1512

1513
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
2,147,483,647✔
1514
  int32_t code = 0;
2,147,483,647✔
1515

1516
  if (pRequest->parseOnly) {
2,147,483,647✔
1517
    doRequestCallback(pRequest, 0);
881,303✔
1518
    return;
881,303✔
1519
  }
1520

1521
  pRequest->body.execMode = pQuery->execMode;
2,147,483,647✔
1522
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
2,147,483,647✔
1523
    destorySqlCallbackWrapper(pWrapper);
67,745,085✔
1524
    pRequest->pWrapper = NULL;
67,747,106✔
1525
  }
1526

1527
  if (pQuery->pRoot && !pRequest->inRetry) {
2,147,483,647!
1528
    STscObj*            pTscObj = pRequest->pTscObj;
2,147,483,647✔
1529
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
2,147,483,647✔
1530
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
2,147,483,647✔
1531
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
1,850,768,375✔
1532
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
1,695,051,929✔
1533
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
730,819,017✔
1534
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
421,726,797✔
1535
    }
1536
  }
1537

1538
  switch (pQuery->execMode) {
2,147,483,647!
1539
    case QUERY_EXEC_MODE_LOCAL:
19,880,398✔
1540
      asyncExecLocalCmd(pRequest, pQuery);
19,880,398✔
1541
      break;
19,880,398✔
1542
    case QUERY_EXEC_MODE_RPC:
45,460,772✔
1543
      code = asyncExecDdlQuery(pRequest, pQuery);
45,460,772✔
1544
      break;
45,463,255✔
1545
    case QUERY_EXEC_MODE_SCHEDULE: {
2,147,483,647✔
1546
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
2,147,483,647✔
1547
      break;
2,147,483,647✔
1548
    }
1549
    case QUERY_EXEC_MODE_EMPTY_RESULT:
2,405,225✔
1550
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
2,405,225✔
1551
      doRequestCallback(pRequest, 0);
2,405,225✔
1552
      break;
2,405,225✔
1553
    default:
×
1554
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1555
      doRequestCallback(pRequest, -1);
×
1556
      break;
×
1557
  }
1558
}
1559

1560
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
46,133✔
1561
  SCatalog* pCatalog = NULL;
46,133✔
1562
  int32_t   code = 0;
46,133✔
1563
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
46,133✔
1564
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
46,133✔
1565

1566
  if (dbNum <= 0 && tblNum <= 0) {
46,133!
1567
    return TSDB_CODE_APP_ERROR;
46,018✔
1568
  }
1569

1570
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
115✔
1571
  if (code != TSDB_CODE_SUCCESS) {
115!
1572
    return code;
×
1573
  }
1574

1575
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
115✔
1576
                           .requestId = pRequest->requestId,
115✔
1577
                           .requestObjRefId = pRequest->self,
115✔
1578
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
115✔
1579

1580
  for (int32_t i = 0; i < dbNum; ++i) {
230✔
1581
    char* dbFName = taosArrayGet(pRequest->dbList, i);
115✔
1582

1583
    // catalogRefreshDBVgInfo will handle dbFName == null.
1584
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
115✔
1585
    if (code != TSDB_CODE_SUCCESS) {
115!
1586
      return code;
×
1587
    }
1588
  }
1589

1590
  for (int32_t i = 0; i < tblNum; ++i) {
230✔
1591
    SName* tableName = taosArrayGet(pRequest->tableList, i);
115✔
1592

1593
    // catalogRefreshTableMeta will handle tableName == null.
1594
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
115✔
1595
    if (code != TSDB_CODE_SUCCESS) {
115!
1596
      return code;
×
1597
    }
1598
  }
1599

1600
  return code;
115✔
1601
}
1602

1603
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
32,112,165✔
1604
  SCatalog* pCatalog = NULL;
32,112,165✔
1605
  int32_t   tbNum = taosArrayGetSize(tbList);
32,112,165✔
1606
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
32,112,165✔
1607
  if (code != TSDB_CODE_SUCCESS) {
32,112,165!
1608
    return code;
×
1609
  }
1610

1611
  if (isView) {
32,112,165✔
1612
    for (int32_t i = 0; i < tbNum; ++i) {
2,085,086✔
1613
      SName* pViewName = taosArrayGet(tbList, i);
1,042,543✔
1614
      char   dbFName[TSDB_DB_FNAME_LEN];
1,036,568✔
1615
      if (NULL == pViewName) {
1,042,543!
1616
        continue;
×
1617
      }
1618
      (void)tNameGetFullDbName(pViewName, dbFName);
1,042,543✔
1619
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
1,042,543!
1620
    }
1621
  } else {
1622
    for (int32_t i = 0; i < tbNum; ++i) {
55,542,012✔
1623
      SName* pTbName = taosArrayGet(tbList, i);
24,472,390✔
1624
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
24,472,390!
1625
    }
1626
  }
1627

1628
  return TSDB_CODE_SUCCESS;
32,112,165✔
1629
}
1630

1631
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
13,421,880✔
1632
  pEpSet->version = 0;
13,421,880✔
1633

1634
  // init mnode ip set
1635
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
13,423,435✔
1636
  mgmtEpSet->numOfEps = 0;
13,424,305✔
1637
  mgmtEpSet->inUse = 0;
13,423,435✔
1638

1639
  if (firstEp && firstEp[0] != 0) {
13,424,305✔
1640
    if (strlen(firstEp) >= TSDB_EP_LEN) {
13,422,397!
1641
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1642
      return -1;
×
1643
    }
1644

1645
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
13,422,397✔
1646
    if (code != TSDB_CODE_SUCCESS) {
13,418,938!
1647
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1648
      return terrno;
×
1649
    }
1650
    // uint32_t addr = 0;
1651
    SIpAddr addr = {0};
13,418,938✔
1652
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
13,416,936✔
1653
    if (code) {
13,419,448✔
1654
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
3,109!
1655
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1656
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
3,230!
1657
    } else {
1658
      mgmtEpSet->numOfEps++;
13,416,360✔
1659
    }
1660
  }
1661

1662
  if (secondEp && secondEp[0] != 0) {
13,420,148✔
1663
    if (strlen(secondEp) >= TSDB_EP_LEN) {
9,242,422!
1664
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1665
      return terrno;
×
1666
    }
1667

1668
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
9,242,422✔
1669
    if (code != TSDB_CODE_SUCCESS) {
9,246,863!
1670
      return code;
×
1671
    }
1672
    SIpAddr addr = {0};
9,246,863✔
1673
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
9,246,863✔
1674
    if (code) {
9,243,236✔
1675
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
426!
1676
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1677
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1678
    } else {
1679
      mgmtEpSet->numOfEps++;
9,242,810✔
1680
    }
1681
  }
1682

1683
  if (mgmtEpSet->numOfEps == 0) {
13,426,057✔
1684
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
3,230✔
1685
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
3,230✔
1686
  }
1687

1688
  return 0;
13,416,159✔
1689
}
1690

1691
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
13,422,932✔
1692
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1693
  *pTscObj = NULL;
13,422,932✔
1694
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
13,422,932✔
1695
  if (TSDB_CODE_SUCCESS != code) {
13,422,932!
1696
    return code;
×
1697
  }
1698

1699
  SRequestObj* pRequest = NULL;
13,422,932✔
1700
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
13,422,932✔
1701
  if (TSDB_CODE_SUCCESS != code) {
13,421,459!
1702
    destroyTscObj(*pTscObj);
×
1703
    return code;
×
1704
  }
1705

1706
  pRequest->sqlstr = taosStrdup("taos_connect");
13,421,459!
1707
  if (pRequest->sqlstr) {
13,422,648!
1708
    pRequest->sqlLen = strlen(pRequest->sqlstr);
13,422,648!
1709
  } else {
1710
    return terrno;
×
1711
  }
1712

1713
  SMsgSendInfo* body = NULL;
13,422,648✔
1714
  code = buildConnectMsg(pRequest, &body);
13,422,648✔
1715
  if (TSDB_CODE_SUCCESS != code) {
13,421,801!
1716
    destroyTscObj(*pTscObj);
×
1717
    return code;
×
1718
  }
1719

1720
  // int64_t transporterId = 0;
1721
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
13,421,801✔
1722
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
13,420,690✔
1723
  if (TSDB_CODE_SUCCESS != code) {
13,421,407!
1724
    destroyTscObj(*pTscObj);
×
1725
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1726
    return code;
×
1727
  }
1728
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
13,421,407!
1729
    destroyTscObj(*pTscObj);
×
1730
    tscError("failed to wait sem, code:%s", terrstr());
×
1731
    return terrno;
×
1732
  }
1733
  if (pRequest->code != TSDB_CODE_SUCCESS) {
13,422,932✔
1734
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
25,839!
1735
    tscError("failed to connect to server, reason: %s", errorMsg);
25,839!
1736

1737
    terrno = pRequest->code;
25,839✔
1738
    destroyRequest(pRequest);
25,839✔
1739
    taos_close_internal(*pTscObj);
25,839✔
1740
    *pTscObj = NULL;
25,839✔
1741
    return terrno;
25,839✔
1742
  } else {
1743
    tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
13,397,093!
1744
            (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1745
    destroyRequest(pRequest);
13,397,093✔
1746
  }
1747
  return code;
13,395,762✔
1748
}
1749

1750
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
13,421,175✔
1751
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
13,421,175!
1752
  if (*pMsgSendInfo == NULL) {
13,421,459!
1753
    return terrno;
×
1754
  }
1755

1756
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
13,421,459✔
1757

1758
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
13,421,459✔
1759
  (*pMsgSendInfo)->requestId = pRequest->requestId;
13,421,459✔
1760
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
13,422,790✔
1761
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
13,419,986!
1762
  if (NULL == (*pMsgSendInfo)->param) {
13,422,506!
1763
    taosMemoryFree(*pMsgSendInfo);
×
1764
    return terrno;
×
1765
  }
1766

1767
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
13,421,175✔
1768

1769
  SConnectReq connectReq = {0};
13,421,317✔
1770
  STscObj*    pObj = pRequest->pTscObj;
13,421,317✔
1771

1772
  char* db = getDbOfConnection(pObj);
13,422,506✔
1773
  if (db != NULL) {
13,422,790✔
1774
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
5,107,074!
1775
  } else if (terrno) {
8,315,716!
1776
    taosMemoryFree(*pMsgSendInfo);
×
1777
    return terrno;
×
1778
  }
1779
  taosMemoryFreeClear(db);
13,422,790!
1780

1781
  connectReq.connType = pObj->connType;
13,424,405✔
1782
  connectReq.pid = appInfo.pid;
13,423,074✔
1783
  connectReq.startTime = appInfo.startTime;
13,421,743✔
1784

1785
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
13,421,743!
1786
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
13,421,885!
1787
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
13,420,554!
1788
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
13,420,554!
1789

1790
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
13,420,554✔
1791
  void*   pReq = taosMemoryMalloc(contLen);
13,422,506!
1792
  if (NULL == pReq) {
13,420,412!
1793
    taosMemoryFree(*pMsgSendInfo);
×
1794
    return terrno;
×
1795
  }
1796

1797
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
13,420,412✔
1798
    taosMemoryFree(*pMsgSendInfo);
284!
1799
    taosMemoryFree(pReq);
×
1800
    return terrno;
×
1801
  }
1802

1803
  (*pMsgSendInfo)->msgInfo.len = contLen;
13,421,096✔
1804
  (*pMsgSendInfo)->msgInfo.pData = pReq;
13,418,690✔
1805
  return TSDB_CODE_SUCCESS;
13,417,486✔
1806
}
1807

1808
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1809
  if (NULL == pEpSet) {
2,147,483,647✔
1810
    return;
2,147,483,647✔
1811
  }
1812

1813
  switch (pSendInfo->target.type) {
111,491,445✔
1814
    case TARGET_TYPE_MNODE:
2,240✔
1815
      if (NULL == pTscObj) {
2,240!
1816
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1817
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1818
        return;
×
1819
      }
1820

1821
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
2,240✔
1822
      SEpSet* pOrig = &originEpset;
2,240✔
1823
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
2,240✔
1824
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
2,240✔
1825
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
2,240!
1826
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1827
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
2,240✔
1828
      break;
766,098✔
1829
    case TARGET_TYPE_VNODE: {
97,303,788✔
1830
      if (NULL == pTscObj) {
97,302,620!
1831
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1832
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1833
        return;
×
1834
      }
1835

1836
      SCatalog* pCatalog = NULL;
97,302,620✔
1837
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
97,302,620✔
1838
      if (code != TSDB_CODE_SUCCESS) {
97,301,550!
1839
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1840
                 tstrerror(code));
1841
        return;
×
1842
      }
1843

1844
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
97,301,550✔
1845
      if (code != TSDB_CODE_SUCCESS) {
97,305,402!
1846
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1847
                 tstrerror(code));
1848
        return;
×
1849
      }
1850
      taosMemoryFreeClear(pSendInfo->target.dbFName);
97,305,402!
1851
      break;
97,304,290✔
1852
    }
1853
    default:
14,193,237✔
1854
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
14,193,237!
1855
      break;
14,193,238✔
1856
  }
1857
}
1858

1859
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1860
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
2,147,483,647✔
1861
  if (pMsg->info.ahandle == NULL) {
2,147,483,647✔
1862
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
12,669,814!
1863
    rpcFreeCont(pMsg->pCont);
12,669,814✔
1864
    taosMemoryFree(pEpSet);
12,669,814!
1865
    return TSDB_CODE_TSC_INTERNAL_ERROR;
12,669,814✔
1866
  }
1867

1868
  STscObj* pTscObj = NULL;
2,147,483,647✔
1869

1870
  STraceId* trace = &pMsg->info.traceId;
2,147,483,647✔
1871
  char      tbuf[40] = {0};
2,147,483,647✔
1872
  TRACE_TO_STR(trace, tbuf);
2,147,483,647✔
1873

1874
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
2,147,483,647!
1875
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1876

1877
  if (pSendInfo->requestObjRefId != 0) {
2,147,483,647✔
1878
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
2,147,483,647✔
1879
    if (pRequest) {
2,147,483,647✔
1880
      if (pRequest->self != pSendInfo->requestObjRefId) {
2,147,483,647!
1881
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1882
                 pSendInfo->requestObjRefId);
1883

1884
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1885
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1886
        }
1887
        rpcFreeCont(pMsg->pCont);
×
1888
        taosMemoryFree(pEpSet);
×
1889
        destroySendMsgInfo(pSendInfo);
×
1890
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1891
      }
1892
      pTscObj = pRequest->pTscObj;
2,147,483,647✔
1893
    }
1894
  }
1895

1896
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
2,147,483,647✔
1897

1898
  SDataBuf buf = {.msgType = pMsg->msgType,
2,147,483,647✔
1899
                  .len = pMsg->contLen,
2,147,483,647✔
1900
                  .pData = NULL,
1901
                  .handle = pMsg->info.handle,
2,147,483,647✔
1902
                  .handleRefId = pMsg->info.refId,
2,147,483,647✔
1903
                  .pEpSet = pEpSet};
1904

1905
  if (pMsg->contLen > 0) {
2,147,483,647✔
1906
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
2,147,483,647!
1907
    if (buf.pData == NULL) {
2,147,483,647!
1908
      pMsg->code = terrno;
×
1909
    } else {
1910
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
2,147,483,647!
1911
    }
1912
  }
1913

1914
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
2,147,483,647✔
1915

1916
  if (pTscObj) {
2,147,483,647✔
1917
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
2,147,483,647✔
1918
    if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1919
      tscError("doProcessMsgFromServer taosReleaseRef failed");
25!
1920
      terrno = code;
25✔
1921
      pMsg->code = code;
25✔
1922
    }
1923
  }
1924

1925
  rpcFreeCont(pMsg->pCont);
2,147,483,647✔
1926
  destroySendMsgInfo(pSendInfo);
2,147,483,647✔
1927
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1928
}
1929

1930
int32_t doProcessMsgFromServer(void* param) {
2,147,483,647✔
1931
  AsyncArg* arg = (AsyncArg*)param;
2,147,483,647✔
1932
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
2,147,483,647✔
1933
  taosMemoryFree(arg);
2,147,483,647!
1934
  return code;
2,147,483,647✔
1935
}
1936

1937
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1938
  int32_t code = 0;
2,147,483,647✔
1939
  SEpSet* tEpSet = NULL;
2,147,483,647✔
1940

1941
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
2,147,483,647✔
1942

1943
  if (pEpSet != NULL) {
2,147,483,647✔
1944
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
111,494,857!
1945
    if (NULL == tEpSet) {
111,498,465!
1946
      code = terrno;
×
1947
      pMsg->code = terrno;
×
1948
      goto _exit;
×
1949
    }
1950
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
111,498,465!
1951
  }
1952

1953
  // pMsg is response msg
1954
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
2,147,483,647✔
1955
    // restore origin code
1956
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
13,421,797!
1957
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1958
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
13,422,932!
1959
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1960
    }
1961
  } else {
1962
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1963
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
2,147,483,647!
1964
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1965
    }
1966
  }
1967

1968
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
2,147,483,647!
1969
  if (NULL == arg) {
2,147,483,647!
1970
    code = terrno;
×
1971
    pMsg->code = code;
×
1972
    goto _exit;
×
1973
  }
1974

1975
  arg->msg = *pMsg;
2,147,483,647✔
1976
  arg->pEpset = tEpSet;
2,147,483,647✔
1977

1978
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
2,147,483,647✔
1979
    pMsg->code = code;
120✔
1980
    taosMemoryFree(arg);
120!
1981
    goto _exit;
×
1982
  }
1983
  return;
2,147,483,647✔
1984

1985
_exit:
×
1986
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
1987
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
1988
  if (code != 0) {
×
1989
    tscError("failed to sched msg to tsc, tsc ready quit");
×
1990
  }
1991
}
1992

1993
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
3,267✔
1994
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
3,267!
1995
  if (user == NULL) {
3,267!
1996
    user = TSDB_DEFAULT_USER;
×
1997
  }
1998

1999
  if (auth == NULL) {
3,267!
2000
    tscError("No auth info is given, failed to connect to server");
×
2001
    return NULL;
×
2002
  }
2003

2004
  STscObj* pObj = NULL;
3,267✔
2005
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
3,267✔
2006
  if (TSDB_CODE_SUCCESS == code) {
3,267✔
2007
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
964!
2008
    if (NULL == rid) {
964!
2009
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2010
    }
2011
    *rid = pObj->id;
964✔
2012
    return (TAOS*)rid;
964✔
2013
  }
2014

2015
  return NULL;
2,303✔
2016
}
2017

2018
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
2019
//                      const char* db, int dbLen, uint16_t port) {
2020
//   char ipStr[TSDB_EP_LEN] = {0};
2021
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
2022
//   char userStr[TSDB_USER_LEN] = {0};
2023
//   char passStr[TSDB_PASSWORD_LEN] = {0};
2024
//
2025
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
2026
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
2027
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
2028
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
2029
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
2030
// }
2031

2032
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2033
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2034
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2035

2036
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2037
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2038

2039
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647!
2040
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647!
2041
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2042

2043
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
2044
          pResultInfo->length[i] = blobDataLen(pStart);
31,484✔
2045
          pResultInfo->row[i] = blobDataVal(pStart);
23,172✔
2046
        } else {
2047
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2048
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2049
        }
2050
      } else {
2051
        pResultInfo->row[i] = NULL;
618,914,000✔
2052
        pResultInfo->length[i] = 0;
619,008,985✔
2053
      }
2054
    } else {
2055
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647!
2056
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2057
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2058
      } else {
2059
        pResultInfo->row[i] = NULL;
2,147,483,647✔
2060
        pResultInfo->length[i] = 0;
2,147,483,647✔
2061
      }
2062
    }
2063
  }
2064
}
2,147,483,647✔
2065

2066
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2067
  if (pRequest == NULL) {
×
2068
    return NULL;
×
2069
  }
2070

2071
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2072
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2073
    // All data has returned to App already, no need to try again
2074
    if (pResultInfo->completed) {
×
2075
      pResultInfo->numOfRows = 0;
×
2076
      return NULL;
×
2077
    }
2078

2079
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2080
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2081

2082
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2083
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2084
      pResultInfo->numOfRows = 0;
×
2085
      return NULL;
×
2086
    }
2087

2088
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2089
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2090
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2091
      pResultInfo->numOfRows = 0;
×
2092
      return NULL;
×
2093
    }
2094

2095
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2096
             ", complete:%d, QID:0x%" PRIx64,
2097
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2098

2099
    STscObj*            pTscObj = pRequest->pTscObj;
×
2100
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2101
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2102

2103
    if (pResultInfo->numOfRows == 0) {
×
2104
      return NULL;
×
2105
    }
2106
  }
2107

2108
  if (setupOneRowPtr) {
×
2109
    doSetOneRowPtr(pResultInfo);
×
2110
    pResultInfo->current += 1;
×
2111
  }
2112

2113
  return pResultInfo->row;
×
2114
}
2115

2116
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
577,098,609✔
2117
  tsem_t* sem = param;
577,098,609✔
2118
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
577,098,609!
2119
    tscError("failed to post sem, code:%s", terrstr());
×
2120
  }
2121
}
577,103,193✔
2122

2123
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
2,147,483,647✔
2124
  if (pRequest == NULL) {
2,147,483,647!
2125
    return NULL;
×
2126
  }
2127

2128
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
2,147,483,647✔
2129
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
2,147,483,647✔
2130
    // All data has returned to App already, no need to try again
2131
    if (pResultInfo->completed) {
979,793,781✔
2132
      pResultInfo->numOfRows = 0;
402,713,757✔
2133
      return NULL;
402,712,622✔
2134
    }
2135

2136
    // convert ucs4 to native multi-bytes string
2137
    pResultInfo->convertUcs4 = convertUcs4;
577,095,172✔
2138
    tsem_t sem;
572,108,991✔
2139
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
577,100,935!
2140
      tscError("failed to init sem, code:%s", terrstr());
×
2141
    }
2142
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
577,099,641✔
2143
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
577,103,193!
2144
      tscError("failed to wait sem, code:%s", terrstr());
×
2145
    }
2146
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
577,102,521!
2147
      tscError("failed to destroy sem, code:%s", terrstr());
×
2148
    }
2149
    pRequest->inCallback = false;
577,103,193✔
2150
  }
2151

2152
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2153
    return NULL;
46,639,745✔
2154
  } else {
2155
    if (setupOneRowPtr) {
2,147,483,647✔
2156
      doSetOneRowPtr(pResultInfo);
2,147,483,647✔
2157
      pResultInfo->current += 1;
2,147,483,647✔
2158
    }
2159

2160
    return pResultInfo->row;
2,147,483,647✔
2161
  }
2162
}
2163

2164
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
830,804,619✔
2165
  if (pResInfo->row == NULL) {
830,804,619✔
2166
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
694,557,970!
2167
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
694,557,808!
2168
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
694,543,321!
2169
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
694,550,158!
2170

2171
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
694,549,120✔
2172
      taosMemoryFree(pResInfo->row);
17,677!
2173
      taosMemoryFree(pResInfo->pCol);
×
2174
      taosMemoryFree(pResInfo->length);
×
2175
      taosMemoryFree(pResInfo->convertBuf);
×
2176
      return terrno;
×
2177
    }
2178
  }
2179

2180
  return TSDB_CODE_SUCCESS;
830,799,965✔
2181
}
2182

2183
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
830,312,003✔
2184
  int32_t idx = -1;
830,312,003✔
2185
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
830,313,568✔
2186
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
830,303,998!
2187

2188
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2189
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2190
    int32_t schemaBytes =
2191
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2192

2193
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
2,147,483,647✔
2194
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
166,669,856!
2195
      if (p == NULL) {
166,669,856!
2196
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2197
        return terrno;
×
2198
      }
2199

2200
      pResultInfo->convertBuf[i] = p;
166,669,856✔
2201

2202
      SResultColumn* pCol = &pResultInfo->pCol[i];
166,669,856✔
2203
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2204
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2205
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2206

2207
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2208
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647!
2209
            tscError(
11,310✔
2210
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2211
                "colLength[i]):%p",
2212
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2213
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
11,310✔
2214
            return TSDB_CODE_TSC_INTERNAL_ERROR;
67✔
2215
          }
2216

2217
          varDataSetLen(p, len);
2,147,483,647✔
2218
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2219
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2220
        }
2221
      }
2222

2223
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
166,668,331✔
2224
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
166,668,331✔
2225
    }
2226
  }
2227
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
830,308,705✔
2228
  return TSDB_CODE_SUCCESS;
830,314,919✔
2229
}
2230

2231
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
830,304,791✔
2232
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2233
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
2,147,483,647✔
2234
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
2,147,483,647✔
2235
    int32_t       type = pFieldE->type;
2,147,483,647✔
2236
    int32_t       bufLen = 0;
2,147,483,647✔
2237
    char*         p = NULL;
2,147,483,647✔
2238
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
2,147,483,647!
2239
      continue;
2,147,483,647✔
2240
    } else {
2241
      bufLen = 64;
1,981,489✔
2242
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,981,489!
2243
      pFieldE->bytes = bufLen;
1,981,489✔
2244
      pField->bytes = bufLen;
1,981,489✔
2245
    }
2246
    if (!p) return terrno;
1,981,489!
2247
    pResultInfo->convertBuf[i] = p;
1,981,489✔
2248

2249
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,531,588,199✔
2250
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
1,529,606,710✔
2251
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
1,529,606,710✔
2252
      p += bufLen;
1,529,606,710✔
2253
      if (TSDB_CODE_SUCCESS != code) {
1,529,606,710!
2254
        return code;
×
2255
      }
2256
    }
2257
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,981,489✔
2258
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,981,489✔
2259
  }
2260
  return 0;
830,280,337✔
2261
}
2262

2263
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1,157,670✔
2264
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
2,314,344✔
2265
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
1,156,674✔
2266
}
2267

2268
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
578,835✔
2269
  char*   p = (char*)pResultInfo->pData;
578,835✔
2270
  int32_t blockVersion = *(int32_t*)p;
578,835✔
2271

2272
  int32_t numOfRows = pResultInfo->numOfRows;
578,835✔
2273
  int32_t numOfCols = pResultInfo->numOfCols;
578,835✔
2274

2275
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2276
  // length |
2277
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
578,835✔
2278
  if (numOfCols != cols) {
578,835!
2279
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2280
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2281
  }
2282

2283
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
578,835✔
2284
  int32_t* colLength = (int32_t*)(p + len);
578,835✔
2285
  len += sizeof(int32_t) * numOfCols;
578,835✔
2286

2287
  char* pStart = p + len;
578,835✔
2288
  for (int32_t i = 0; i < numOfCols; ++i) {
2,620,606✔
2289
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,041,771!
2290

2291
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,041,771✔
2292
      int32_t* offset = (int32_t*)pStart;
738,729✔
2293
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
738,729✔
2294
      len += lenTmp;
738,729✔
2295
      pStart += lenTmp;
738,729✔
2296

2297
      int32_t estimateColLen = 0;
738,729✔
2298
      for (int32_t j = 0; j < numOfRows; ++j) {
4,835,510✔
2299
        if (offset[j] == -1) {
4,096,781✔
2300
          continue;
168,328✔
2301
        }
2302
        char* data = offset[j] + pStart;
3,928,453✔
2303

2304
        int32_t jsonInnerType = *data;
3,928,453✔
2305
        char*   jsonInnerData = data + CHAR_BYTES;
3,928,453✔
2306
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
3,928,453✔
2307
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
41,418✔
2308
        } else if (tTagIsJson(data)) {
3,887,035✔
2309
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
640,080✔
2310
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
3,246,955✔
2311
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
3,076,681✔
2312
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
170,274✔
2313
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
124,254✔
2314
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
46,020!
2315
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
46,020✔
2316
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
2317
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2318
        } else {
2319
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2320
          return -1;
×
2321
        }
2322
      }
2323
      len += TMAX(colLen, estimateColLen);
738,729✔
2324
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,303,042!
2325
      int32_t lenTmp = numOfRows * sizeof(int32_t);
227,799✔
2326
      len += (lenTmp + colLen);
227,799✔
2327
      pStart += lenTmp;
227,799✔
2328
    } else {
2329
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
1,075,243✔
2330
      len += (lenTmp + colLen);
1,075,243✔
2331
      pStart += lenTmp;
1,075,243✔
2332
    }
2333
    pStart += colLen;
2,041,771✔
2334
  }
2335

2336
  // Ensure the complete structure of the block, including the blankfill field,
2337
  // even though it is not used on the client side.
2338
  len += sizeof(bool);
578,835✔
2339
  return len;
578,835✔
2340
}
2341

2342
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
830,799,147✔
2343
  int32_t numOfRows = pResultInfo->numOfRows;
830,799,147✔
2344
  int32_t numOfCols = pResultInfo->numOfCols;
830,804,503✔
2345
  bool    needConvert = false;
830,810,680✔
2346
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
2347
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
2348
      needConvert = true;
578,835✔
2349
      break;
578,835✔
2350
    }
2351
  }
2352

2353
  if (!needConvert) {
830,812,671✔
2354
    return TSDB_CODE_SUCCESS;
830,233,942✔
2355
  }
2356

2357
  tscDebug("start to convert form json format string");
578,742✔
2358

2359
  char*   p = (char*)pResultInfo->pData;
578,742✔
2360
  int32_t blockVersion = *(int32_t*)p;
578,742✔
2361
  int32_t dataLen = estimateJsonLen(pResultInfo);
578,742✔
2362
  if (dataLen <= 0) {
578,835!
2363
    tscError("doConvertJson error: estimateJsonLen failed");
×
2364
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2365
  }
2366

2367
  taosMemoryFreeClear(pResultInfo->convertJson);
578,835!
2368
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
578,835!
2369
  if (pResultInfo->convertJson == NULL) return terrno;
578,835!
2370
  char* p1 = pResultInfo->convertJson;
578,835✔
2371

2372
  int32_t totalLen = 0;
578,835✔
2373
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
578,835✔
2374
  if (numOfCols != cols) {
578,835!
2375
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2376
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2377
  }
2378

2379
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
578,835✔
2380
  (void)memcpy(p1, p, len);
578,835!
2381

2382
  p += len;
578,835✔
2383
  p1 += len;
578,835✔
2384
  totalLen += len;
578,835✔
2385

2386
  len = sizeof(int32_t) * numOfCols;
578,835✔
2387
  int32_t* colLength = (int32_t*)p;
578,835✔
2388
  int32_t* colLength1 = (int32_t*)p1;
578,835✔
2389
  (void)memcpy(p1, p, len);
578,835!
2390
  p += len;
578,835✔
2391
  p1 += len;
578,835✔
2392
  totalLen += len;
578,835✔
2393

2394
  char* pStart = p;
578,835✔
2395
  char* pStart1 = p1;
578,835✔
2396
  for (int32_t i = 0; i < numOfCols; ++i) {
2,620,606✔
2397
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,041,771!
2398
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
2,041,771!
2399
    if (colLen >= dataLen) {
2,041,771!
2400
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2401
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2402
    }
2403
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,041,771✔
2404
      int32_t* offset = (int32_t*)pStart;
738,729✔
2405
      int32_t* offset1 = (int32_t*)pStart1;
738,729✔
2406
      len = numOfRows * sizeof(int32_t);
738,729✔
2407
      (void)memcpy(pStart1, pStart, len);
738,729!
2408
      pStart += len;
738,729✔
2409
      pStart1 += len;
738,729✔
2410
      totalLen += len;
738,729✔
2411

2412
      len = 0;
738,729✔
2413
      for (int32_t j = 0; j < numOfRows; ++j) {
4,835,510✔
2414
        if (offset[j] == -1) {
4,096,781✔
2415
          continue;
168,328✔
2416
        }
2417
        char* data = offset[j] + pStart;
3,928,453✔
2418

2419
        int32_t jsonInnerType = *data;
3,928,453✔
2420
        char*   jsonInnerData = data + CHAR_BYTES;
3,928,453✔
2421
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
3,928,453✔
2422
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
3,928,453✔
2423
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
41,418✔
2424
          varDataSetLen(dst, strlen(varDataVal(dst)));
41,418!
2425
        } else if (tTagIsJson(data)) {
3,887,035✔
2426
          char* jsonString = NULL;
640,080✔
2427
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
640,080✔
2428
          if (jsonString == NULL) {
640,080!
2429
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2430
            return terrno;
×
2431
          }
2432
          STR_TO_VARSTR(dst, jsonString);
640,080!
2433
          taosMemoryFree(jsonString);
640,080!
2434
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
3,246,955✔
2435
          *(char*)varDataVal(dst) = '\"';
3,076,681✔
2436
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
3,076,681✔
2437
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
3,076,681✔
2438
                                         varDataVal(tmp), pResultInfo->charsetCxt);
2439
          if (length <= 0) {
3,076,681✔
2440
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
2,301!
2441
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2442
            length = 0;
2,301✔
2443
          }
2444
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,varDataVal(tmp), length);
3,076,681✔
2445
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
3,076,681✔
2446
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
3,076,681✔
2447
          tscError("value:%s.", varDataVal(dst));
3,076,681!
2448
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
170,274✔
2449
          double jsonVd = *(double*)(jsonInnerData);
124,254✔
2450
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
124,254✔
2451
          varDataSetLen(dst, strlen(varDataVal(dst)));
124,254!
2452
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
46,020!
2453
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
46,020✔
2454
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
46,020✔
2455
          varDataSetLen(dst, strlen(varDataVal(dst)));
46,020!
2456
        } else {
2457
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2458
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2459
        }
2460

2461
        offset1[j] = len;
3,928,453✔
2462
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
3,928,453!
2463
        len += varDataTLen(dst);
3,928,453✔
2464
      }
2465
      colLen1 = len;
738,729✔
2466
      totalLen += colLen1;
738,729✔
2467
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
738,729!
2468
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,303,042!
2469
      len = numOfRows * sizeof(int32_t);
227,799✔
2470
      (void)memcpy(pStart1, pStart, len);
227,799!
2471
      pStart += len;
227,799✔
2472
      pStart1 += len;
227,799✔
2473
      totalLen += len;
227,799✔
2474
      totalLen += colLen;
227,799✔
2475
      (void)memcpy(pStart1, pStart, colLen);
227,799!
2476
    } else {
2477
      len = BitmapLen(pResultInfo->numOfRows);
1,075,243✔
2478
      (void)memcpy(pStart1, pStart, len);
1,075,243!
2479
      pStart += len;
1,075,243✔
2480
      pStart1 += len;
1,075,243✔
2481
      totalLen += len;
1,075,243✔
2482
      totalLen += colLen;
1,075,243✔
2483
      (void)memcpy(pStart1, pStart, colLen);
1,075,243!
2484
    }
2485
    pStart += colLen;
2,041,771✔
2486
    pStart1 += colLen1;
2,041,771✔
2487
  }
2488

2489
  // Ensure the complete structure of the block, including the blankfill field,
2490
  // even though it is not used on the client side.
2491
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2492
  totalLen += sizeof(bool);
578,835✔
2493

2494
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
578,835✔
2495
  pResultInfo->pData = pResultInfo->convertJson;
578,835✔
2496
  return TSDB_CODE_SUCCESS;
578,835✔
2497
}
2498

2499
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
878,530,851✔
2500
  bool convertForDecimal = convertUcs4;
878,530,851✔
2501
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
878,530,851✔
2502
    tscError("setResultDataPtr paras error");
9,697!
2503
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2504
  }
2505

2506
  if (pResultInfo->numOfRows == 0) {
878,528,537✔
2507
    return TSDB_CODE_SUCCESS;
47,714,139✔
2508
  }
2509

2510
  if (pResultInfo->pData == NULL) {
830,811,068!
2511
    tscError("setResultDataPtr error: pData is NULL");
×
2512
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2513
  }
2514

2515
  int32_t code = doPrepareResPtr(pResultInfo);
830,799,501✔
2516
  if (code != TSDB_CODE_SUCCESS) {
830,797,915!
2517
    return code;
×
2518
  }
2519
  code = doConvertJson(pResultInfo);
830,797,915✔
2520
  if (code != TSDB_CODE_SUCCESS) {
830,791,121!
2521
    return code;
×
2522
  }
2523

2524
  char* p = (char*)pResultInfo->pData;
830,791,121✔
2525

2526
  // version:
2527
  int32_t blockVersion = *(int32_t*)p;
830,794,306✔
2528
  p += sizeof(int32_t);
830,802,424✔
2529

2530
  int32_t dataLen = *(int32_t*)p;
830,808,208✔
2531
  p += sizeof(int32_t);
830,809,101✔
2532

2533
  int32_t rows = *(int32_t*)p;
830,808,656✔
2534
  p += sizeof(int32_t);
830,810,887✔
2535

2536
  int32_t cols = *(int32_t*)p;
830,814,561✔
2537
  p += sizeof(int32_t);
830,808,951✔
2538

2539
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
830,810,038✔
2540
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
4,993!
2541
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2542
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2543
  }
2544

2545
  int32_t hasColumnSeg = *(int32_t*)p;
830,801,899✔
2546
  p += sizeof(int32_t);
830,795,132✔
2547

2548
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
830,811,813✔
2549
  p += sizeof(uint64_t);
830,811,813✔
2550

2551
  // check fields
2552
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2553
    int8_t type = *(int8_t*)p;
2,147,483,647✔
2554
    p += sizeof(int8_t);
2,147,483,647✔
2555

2556
    int32_t bytes = *(int32_t*)p;
2,147,483,647✔
2557
    p += sizeof(int32_t);
2,147,483,647✔
2558

2559
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
2,147,483,647!
2560
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
×
2561
    }
2562
  }
2563

2564
  int32_t* colLength = (int32_t*)p;
830,810,538✔
2565
  p += sizeof(int32_t) * pResultInfo->numOfCols;
830,810,538✔
2566

2567
  char* pStart = p;
830,814,250✔
2568
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2569
    if ((pStart - pResultInfo->pData) >= dataLen) {
2,147,483,647!
2570
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2571
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2572
    }
2573
    if (blockVersion == BLOCK_VERSION_1) {
2,147,483,647✔
2574
      colLength[i] = htonl(colLength[i]);
2,037,594,245✔
2575
    }
2576
    if (colLength[i] >= dataLen) {
2,147,483,647!
2577
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2578
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2579
    }
2580
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
2,147,483,647!
2581
      tscError("invalid type %d", pResultInfo->fields[i].type);
3,532!
2582
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2583
    }
2584
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
2,147,483,647✔
2585
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
862,970,134✔
2586
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
862,991,266✔
2587
    } else {
2588
      pResultInfo->pCol[i].nullbitmap = pStart;
2,147,483,647✔
2589
      pStart += BitmapLen(pResultInfo->numOfRows);
2,147,483,647✔
2590
    }
2591

2592
    pResultInfo->pCol[i].pData = pStart;
2,147,483,647✔
2593
    pResultInfo->length[i] =
2,147,483,647✔
2594
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2595
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
2,147,483,647✔
2596

2597
    pStart += colLength[i];
2,147,483,647✔
2598
  }
2599

2600
  p = pStart;
830,819,223✔
2601
  // bool blankFill = *(bool*)p;
2602
  p += sizeof(bool);
830,819,223✔
2603
  int32_t offset = p - pResultInfo->pData;
830,820,380✔
2604
  if (offset > dataLen) {
830,811,776!
2605
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2606
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2607
  }
2608

2609
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2610
  if (convertUcs4) {
830,811,776✔
2611
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
830,314,622✔
2612
  }
2613
#endif
2614
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
830,813,926✔
2615
    code = convertDecimalType(pResultInfo);
830,316,705✔
2616
  }
2617
  return code;
830,804,333✔
2618
}
2619

2620
char* getDbOfConnection(STscObj* pObj) {
2,147,483,647✔
2621
  terrno = TSDB_CODE_SUCCESS;
2,147,483,647✔
2622
  char* p = NULL;
2,147,483,647✔
2623
  (void)taosThreadMutexLock(&pObj->mutex);
2,147,483,647✔
2624
  size_t len = strlen(pObj->db);
2,147,483,647!
2625
  if (len > 0) {
2,147,483,647✔
2626
    p = taosStrndup(pObj->db, tListLen(pObj->db));
2,147,483,647!
2627
    if (p == NULL) {
2,147,483,647!
2628
      tscError("failed to taosStrndup db name");
×
2629
    }
2630
  }
2631

2632
  (void)taosThreadMutexUnlock(&pObj->mutex);
2,147,483,647✔
2633
  return p;
2,147,483,647✔
2634
}
2635

2636
void setConnectionDB(STscObj* pTscObj, const char* db) {
8,484,260✔
2637
  if (db == NULL || pTscObj == NULL) {
8,484,260!
2638
    tscError("setConnectionDB para is NULL");
×
2639
    return;
×
2640
  }
2641

2642
  (void)taosThreadMutexLock(&pTscObj->mutex);
8,486,055✔
2643
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
8,486,055!
2644
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
8,486,055✔
2645
}
2646

2647
void resetConnectDB(STscObj* pTscObj) {
×
2648
  if (pTscObj == NULL) {
×
2649
    return;
×
2650
  }
2651

2652
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2653
  pTscObj->db[0] = 0;
×
2654
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2655
}
2656

2657
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
605,867,686✔
2658
                              bool isStmt) {
2659
  if (pResultInfo == NULL || pRsp == NULL) {
605,867,686!
2660
    tscError("setQueryResultFromRsp paras is null");
485!
2661
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2662
  }
2663

2664
  taosMemoryFreeClear(pResultInfo->pRspMsg);
605,867,201!
2665
  pResultInfo->pRspMsg = (const char*)pRsp;
605,867,223✔
2666
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
605,868,358✔
2667
  pResultInfo->current = 0;
605,868,358✔
2668
  pResultInfo->completed = (pRsp->completed == 1);
605,868,358✔
2669
  pResultInfo->precision = pRsp->precision;
605,866,066✔
2670

2671
  // decompress data if needed
2672
  int32_t payloadLen = htonl(pRsp->payloadLen);
605,864,732✔
2673

2674
  if (pRsp->compressed) {
605,860,616!
2675
    if (pResultInfo->decompBuf == NULL) {
×
2676
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
2677
      if (pResultInfo->decompBuf == NULL) {
×
2678
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2679
        return terrno;
×
2680
      }
2681
      pResultInfo->decompBufSize = payloadLen;
×
2682
    } else {
2683
      if (pResultInfo->decompBufSize < payloadLen) {
×
2684
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2685
        if (p == NULL) {
×
2686
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2687
          return terrno;
×
2688
        }
2689

2690
        pResultInfo->decompBuf = p;
×
2691
        pResultInfo->decompBufSize = payloadLen;
×
2692
      }
2693
    }
2694
  }
2695

2696
  if (payloadLen > 0) {
605,866,044✔
2697
    int32_t compLen = *(int32_t*)pRsp->data;
558,157,617✔
2698
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
558,156,906✔
2699

2700
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
558,151,699✔
2701

2702
    if (pRsp->compressed && compLen < rawLen) {
558,156,283!
2703
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
2704
      if (len < 0) {
×
2705
        tscError("tsDecompressString failed");
×
2706
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2707
      }
2708
      if (len != rawLen) {
×
2709
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2710
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2711
      }
2712
      pResultInfo->pData = pResultInfo->decompBuf;
×
2713
      pResultInfo->payloadLen = rawLen;
×
2714
    } else {
2715
      pResultInfo->pData = pStart;
558,155,126✔
2716
      pResultInfo->payloadLen = htonl(pRsp->compLen);
558,157,617✔
2717
      if (pRsp->compLen != pRsp->payloadLen) {
558,151,898!
2718
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2719
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2720
      }
2721
    }
2722
  }
2723

2724
  // TODO handle the compressed case
2725
  pResultInfo->totalRows += pResultInfo->numOfRows;
605,860,594✔
2726

2727
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
605,866,313✔
2728
  return code;
605,858,952✔
2729
}
2730

2731
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
4,250✔
2732
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
4,250✔
2733
  void*              clientRpc = NULL;
4,250✔
2734
  SServerStatusRsp   statusRsp = {0};
4,250✔
2735
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
4,250✔
2736
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
4,250✔
2737
  SRpcMsg  rpcRsp = {0};
4,250✔
2738
  SRpcInit rpcInit = {0};
4,250✔
2739
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
4,250✔
2740

2741
  rpcInit.label = "CHK";
4,250✔
2742
  rpcInit.numOfThreads = 1;
4,250✔
2743
  rpcInit.cfp = NULL;
4,250✔
2744
  rpcInit.sessions = 16;
4,250✔
2745
  rpcInit.connType = TAOS_CONN_CLIENT;
4,250✔
2746
  rpcInit.idleTime = tsShellActivityTimer * 1000;
4,250✔
2747
  rpcInit.compressSize = tsCompressMsgSize;
4,250✔
2748
  rpcInit.user = "_dnd";
4,250✔
2749

2750
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
4,250!
2751
  connLimitNum = TMAX(connLimitNum, 10);
4,250✔
2752
  connLimitNum = TMIN(connLimitNum, 500);
4,250✔
2753
  rpcInit.connLimitNum = connLimitNum;
4,250✔
2754
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
4,250✔
2755
  rpcInit.readTimeout = tsReadTimeout;
4,250✔
2756
  rpcInit.ipv6 = tsEnableIpv6;
4,250✔
2757
  rpcInit.enableSSL = tsEnableTLS;
4,250✔
2758

2759
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
4,250!
2760
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
4,250!
2761
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
4,250!
2762
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
4,250!
2763
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
4,250!
2764

2765
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
4,250!
2766
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2767
    goto _OVER;
×
2768
  }
2769

2770
  clientRpc = rpcOpen(&rpcInit);
4,250✔
2771
  if (clientRpc == NULL) {
4,250!
2772
    code = terrno;
×
2773
    tscError("failed to init server status client since %s", tstrerror(code));
×
2774
    goto _OVER;
×
2775
  }
2776

2777
  if (fqdn == NULL) {
4,250!
2778
    fqdn = tsLocalFqdn;
4,250✔
2779
  }
2780

2781
  if (port == 0) {
4,250!
2782
    port = tsServerPort;
4,250✔
2783
  }
2784

2785
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
4,250!
2786
  epSet.eps[0].port = (uint16_t)port;
4,250✔
2787
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
4,250✔
2788
  if (TSDB_CODE_SUCCESS != ret) {
4,250!
2789
    tscError("failed to send recv since %s", tstrerror(ret));
×
2790
    goto _OVER;
×
2791
  }
2792

2793
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
4,250!
2794
    tscError("failed to send server status req since %s", terrstr());
963!
2795
    goto _OVER;
963✔
2796
  }
2797

2798
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
3,287!
2799
    tscError("failed to parse server status rsp since %s", terrstr());
×
2800
    goto _OVER;
×
2801
  }
2802

2803
  code = statusRsp.statusCode;
3,287✔
2804
  if (details != NULL) {
3,287!
2805
    tstrncpy(details, statusRsp.details, maxlen);
3,287!
2806
  }
2807

2808
_OVER:
4,129✔
2809
  if (clientRpc != NULL) {
4,250!
2810
    rpcClose(clientRpc);
4,250✔
2811
  }
2812
  if (rpcRsp.pCont != NULL) {
4,250✔
2813
    rpcFreeCont(rpcRsp.pCont);
3,287✔
2814
  }
2815
  return code;
4,250✔
2816
}
2817

2818
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
3,070✔
2819
                      int32_t acctId, char* db) {
2820
  SName name = {0};
3,070✔
2821

2822
  if (len1 <= 0) {
3,070!
2823
    return -1;
×
2824
  }
2825

2826
  const char* dbName = db;
3,070✔
2827
  const char* tbName = NULL;
3,070✔
2828
  int32_t     dbLen = 0;
3,070✔
2829
  int32_t     tbLen = 0;
3,070✔
2830
  if (len2 > 0) {
3,070!
2831
    dbName = str + pos1;
×
2832
    dbLen = len1;
×
2833
    tbName = str + pos2;
×
2834
    tbLen = len2;
×
2835
  } else {
2836
    dbLen = strlen(db);
3,070!
2837
    tbName = str + pos1;
3,070✔
2838
    tbLen = len1;
3,070✔
2839
  }
2840

2841
  if (dbLen <= 0 || tbLen <= 0) {
3,070!
2842
    return -1;
×
2843
  }
2844

2845
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
3,070!
2846
    return -1;
×
2847
  }
2848

2849
  if (tNameAddTbName(&name, tbName, tbLen)) {
3,070!
2850
    return -1;
×
2851
  }
2852

2853
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
3,070✔
2854
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
3,070✔
2855

2856
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
3,070✔
2857
  if (pDb) {
3,070!
2858
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2859
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2860
    }
2861
  } else {
2862
    STablesReq db;
3,070✔
2863
    db.pTables = taosArrayInit(20, sizeof(SName));
3,070✔
2864
    if (NULL == db.pTables) {
3,070!
2865
      return terrno;
×
2866
    }
2867
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
3,070✔
2868
    if (NULL == taosArrayPush(db.pTables, &name)) {
6,140!
2869
      return terrno;
×
2870
    }
2871
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
3,070!
2872
  }
2873

2874
  return TSDB_CODE_SUCCESS;
3,070✔
2875
}
2876

2877
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
3,070✔
2878
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
3,070✔
2879
  if (NULL == pHash) {
3,070!
2880
    return terrno;
×
2881
  }
2882

2883
  bool    inEscape = false;
3,070✔
2884
  int32_t code = 0;
3,070✔
2885
  void*   pIter = NULL;
3,070✔
2886

2887
  int32_t vIdx = 0;
3,070✔
2888
  int32_t vPos[2];
3,070✔
2889
  int32_t vLen[2];
3,070✔
2890

2891
  (void)memset(vPos, -1, sizeof(vPos));
3,070✔
2892
  (void)memset(vLen, 0, sizeof(vLen));
3,070✔
2893

2894
  for (int32_t i = 0;; ++i) {
15,350✔
2895
    if (0 == *(tbList + i)) {
15,350✔
2896
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
3,070!
2897
        vLen[vIdx] = i - vPos[vIdx];
3,070✔
2898
      }
2899

2900
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
3,070✔
2901
      if (code) {
3,070!
2902
        goto _return;
×
2903
      }
2904

2905
      break;
3,070✔
2906
    }
2907

2908
    if ('`' == *(tbList + i)) {
12,280!
2909
      inEscape = !inEscape;
×
2910
      if (!inEscape) {
×
2911
        if (vPos[vIdx] >= 0) {
×
2912
          vLen[vIdx] = i - vPos[vIdx];
×
2913
        } else {
2914
          goto _return;
×
2915
        }
2916
      }
2917

2918
      continue;
×
2919
    }
2920

2921
    if (inEscape) {
12,280!
2922
      if (vPos[vIdx] < 0) {
×
2923
        vPos[vIdx] = i;
×
2924
      }
2925
      continue;
×
2926
    }
2927

2928
    if ('.' == *(tbList + i)) {
12,280!
2929
      if (vPos[vIdx] < 0) {
×
2930
        goto _return;
×
2931
      }
2932
      if (vLen[vIdx] <= 0) {
×
2933
        vLen[vIdx] = i - vPos[vIdx];
×
2934
      }
2935
      vIdx++;
×
2936
      if (vIdx >= 2) {
×
2937
        goto _return;
×
2938
      }
2939
      continue;
×
2940
    }
2941

2942
    if (',' == *(tbList + i)) {
12,280!
2943
      if (vPos[vIdx] < 0) {
×
2944
        goto _return;
×
2945
      }
2946
      if (vLen[vIdx] <= 0) {
×
2947
        vLen[vIdx] = i - vPos[vIdx];
×
2948
      }
2949

2950
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
2951
      if (code) {
×
2952
        goto _return;
×
2953
      }
2954

2955
      (void)memset(vPos, -1, sizeof(vPos));
×
2956
      (void)memset(vLen, 0, sizeof(vLen));
×
2957
      vIdx = 0;
×
2958
      continue;
×
2959
    }
2960

2961
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
12,280!
2962
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
2963
        vLen[vIdx] = i - vPos[vIdx];
×
2964
      }
2965
      continue;
×
2966
    }
2967

2968
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
12,280!
2969
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
1,535!
2970
      if (vLen[vIdx] > 0) {
12,280!
2971
        goto _return;
×
2972
      }
2973
      if (vPos[vIdx] < 0) {
12,280✔
2974
        vPos[vIdx] = i;
3,070✔
2975
      }
2976
      continue;
12,280✔
2977
    }
2978

2979
    goto _return;
×
2980
  }
2981

2982
  int32_t dbNum = taosHashGetSize(pHash);
3,070✔
2983
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
3,070✔
2984
  if (NULL == pReq) {
3,070!
2985
    TSC_ERR_JRET(terrno);
×
2986
  }
2987
  pIter = taosHashIterate(pHash, NULL);
3,070✔
2988
  while (pIter) {
6,140✔
2989
    STablesReq* pDb = (STablesReq*)pIter;
3,070✔
2990
    if (NULL == taosArrayPush(*pReq, pDb)) {
6,140!
2991
      TSC_ERR_JRET(terrno);
×
2992
    }
2993
    pIter = taosHashIterate(pHash, pIter);
3,070✔
2994
  }
2995

2996
  taosHashCleanup(pHash);
3,070✔
2997

2998
  return TSDB_CODE_SUCCESS;
3,070✔
2999

3000
_return:
×
3001

3002
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3003

3004
  pIter = taosHashIterate(pHash, NULL);
×
3005
  while (pIter) {
×
3006
    STablesReq* pDb = (STablesReq*)pIter;
×
3007
    taosArrayDestroy(pDb->pTables);
×
3008
    pIter = taosHashIterate(pHash, pIter);
×
3009
  }
3010

3011
  taosHashCleanup(pHash);
×
3012

3013
  return terrno;
×
3014
}
3015

3016
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
3,070✔
3017
  SSyncQueryParam* pParam = param;
3,070✔
3018
  pParam->pRequest->code = code;
3,070✔
3019

3020
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
3,070!
3021
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3022
  }
3023
}
3,070✔
3024

3025
void syncQueryFn(void* param, void* res, int32_t code) {
2,147,483,647✔
3026
  SSyncQueryParam* pParam = param;
2,147,483,647✔
3027
  pParam->pRequest = res;
2,147,483,647✔
3028

3029
  if (pParam->pRequest) {
2,147,483,647✔
3030
    pParam->pRequest->code = code;
2,147,483,647✔
3031
    clientOperateReport(pParam->pRequest);
2,147,483,647✔
3032
  }
3033

3034
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
2,147,483,647!
3035
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3036
  }
3037
}
2,147,483,647✔
3038

3039
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
2,147,483,647✔
3040
                        int8_t source) {
3041
  if (sql == NULL || NULL == fp) {
2,147,483,647✔
3042
    terrno = TSDB_CODE_INVALID_PARA;
4,184✔
3043
    if (fp) {
×
3044
      fp(param, NULL, terrno);
×
3045
    }
3046

3047
    return;
×
3048
  }
3049

3050
  size_t sqlLen = strlen(sql);
2,147,483,647!
3051
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
2,147,483,647✔
3052
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, TSDB_MAX_ALLOWED_SQL_LEN);
1,272!
3053
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1,272✔
3054
    fp(param, NULL, terrno);
1,272✔
3055
    return;
1,272✔
3056
  }
3057

3058
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
2,147,483,647✔
3059

3060
  SRequestObj* pRequest = NULL;
2,147,483,647✔
3061
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
2,147,483,647✔
3062
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647!
3063
    terrno = code;
×
3064
    fp(param, NULL, terrno);
×
3065
    return;
×
3066
  }
3067

3068
  pRequest->source = source;
2,147,483,647✔
3069
  pRequest->body.queryFp = fp;
2,147,483,647✔
3070
  doAsyncQuery(pRequest, false);
2,147,483,647✔
3071
}
3072

3073
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
1,239✔
3074
                                 int64_t reqid) {
3075
  if (sql == NULL || NULL == fp) {
1,239!
3076
    terrno = TSDB_CODE_INVALID_PARA;
×
3077
    if (fp) {
×
3078
      fp(param, NULL, terrno);
×
3079
    }
3080

3081
    return;
×
3082
  }
3083

3084
  size_t sqlLen = strlen(sql);
1,239!
3085
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
1,239!
3086
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid,
×
3087
             TSDB_MAX_ALLOWED_SQL_LEN);
3088
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3089
    fp(param, NULL, terrno);
×
3090
    return;
×
3091
  }
3092

3093
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
1,239!
3094

3095
  SRequestObj* pRequest = NULL;
1,239✔
3096
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
1,239✔
3097
  if (code != TSDB_CODE_SUCCESS) {
1,239!
3098
    terrno = code;
×
3099
    fp(param, NULL, terrno);
×
3100
    return;
×
3101
  }
3102

3103
  pRequest->body.queryFp = fp;
1,239✔
3104
  doAsyncQuery(pRequest, false);
1,239✔
3105
}
3106

3107
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
2,147,483,647✔
3108
  if (NULL == taos) {
2,147,483,647!
3109
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3110
    return NULL;
×
3111
  }
3112

3113
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
2,147,483,647!
3114
  if (NULL == param) {
2,147,483,647!
3115
    return NULL;
×
3116
  }
3117
  int32_t code = tsem_init(&param->sem, 0, 0);
2,147,483,647✔
3118
  if (TSDB_CODE_SUCCESS != code) {
2,147,483,647!
3119
    taosMemoryFree(param);
×
3120
    return NULL;
×
3121
  }
3122

3123
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
2,147,483,647✔
3124
  code = tsem_wait(&param->sem);
2,147,483,647✔
3125
  if (TSDB_CODE_SUCCESS != code) {
2,147,483,647!
3126
    taosMemoryFree(param);
×
3127
    return NULL;
×
3128
  }
3129
  code = tsem_destroy(&param->sem);
2,147,483,647✔
3130
  if (TSDB_CODE_SUCCESS != code) {
2,147,483,647!
3131
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3132
  }
3133

3134
  SRequestObj* pRequest = NULL;
2,147,483,647✔
3135
  if (param->pRequest != NULL) {
2,147,483,647✔
3136
    param->pRequest->syncQuery = true;
2,147,483,647✔
3137
    pRequest = param->pRequest;
2,147,483,647✔
3138
    param->pRequest->inCallback = false;
2,147,483,647✔
3139
  }
3140
  taosMemoryFree(param);
2,147,483,647!
3141

3142
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3143
  //          *(int64_t*)taos, pRequest);
3144

3145
  return pRequest;
2,147,483,647✔
3146
}
3147

3148
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
1,239✔
3149
  if (NULL == taos) {
1,239!
3150
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3151
    return NULL;
×
3152
  }
3153

3154
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
1,239!
3155
  if (param == NULL) {
1,239!
3156
    return NULL;
×
3157
  }
3158
  int32_t code = tsem_init(&param->sem, 0, 0);
1,239✔
3159
  if (TSDB_CODE_SUCCESS != code) {
1,239!
3160
    taosMemoryFree(param);
×
3161
    return NULL;
×
3162
  }
3163

3164
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
1,239✔
3165
  code = tsem_wait(&param->sem);
1,239✔
3166
  if (TSDB_CODE_SUCCESS != code) {
1,239!
3167
    taosMemoryFree(param);
×
3168
    return NULL;
×
3169
  }
3170
  SRequestObj* pRequest = NULL;
1,239✔
3171
  if (param->pRequest != NULL) {
1,239!
3172
    param->pRequest->syncQuery = true;
1,239✔
3173
    pRequest = param->pRequest;
1,239✔
3174
  }
3175
  taosMemoryFree(param);
1,239!
3176

3177
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3178
  //   *(int64_t*)taos, pRequest);
3179

3180
  return pRequest;
1,239✔
3181
}
3182

3183
static void fetchCallback(void* pResult, void* param, int32_t code) {
592,997,296✔
3184
  SRequestObj* pRequest = (SRequestObj*)param;
592,997,296✔
3185

3186
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
592,997,296✔
3187

3188
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
592,997,296✔
3189
           tstrerror(code), pRequest->requestId);
3190

3191
  pResultInfo->pData = pResult;
592,997,296✔
3192
  pResultInfo->numOfRows = 0;
592,998,431✔
3193

3194
  if (code != TSDB_CODE_SUCCESS) {
592,988,106!
3195
    pRequest->code = code;
×
3196
    taosMemoryFreeClear(pResultInfo->pData);
×
3197
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3198
    return;
×
3199
  }
3200

3201
  if (pRequest->code != TSDB_CODE_SUCCESS) {
592,988,106!
3202
    taosMemoryFreeClear(pResultInfo->pData);
×
3203
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3204
    return;
×
3205
  }
3206

3207
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
597,991,859✔
3208
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
592,997,274!
3209
  if (pRequest->code != TSDB_CODE_SUCCESS) {
592,990,266✔
3210
    pResultInfo->numOfRows = 0;
67✔
3211
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
67!
3212
             tstrerror(pRequest->code), pRequest->requestId);
3213
  } else {
3214
    tscDebug(
592,993,408!
3215
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3216
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3217

3218
    STscObj*            pTscObj = pRequest->pTscObj;
592,994,118✔
3219
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
592,996,178✔
3220
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
592,997,335✔
3221
  }
3222

3223
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
592,995,110✔
3224
}
3225

3226
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
612,323,247✔
3227
  pRequest->body.fetchFp = fp;
612,323,247✔
3228
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
612,323,919✔
3229

3230
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
612,323,919✔
3231

3232
  // this query has no results or error exists, return directly
3233
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
612,325,054!
3234
    pResultInfo->numOfRows = 0;
1,135✔
3235
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3236
    return;
1,602✔
3237
  }
3238

3239
  // all data has returned to App already, no need to try again
3240
  if (pResultInfo->completed) {
612,323,919✔
3241
    // it is a local executed query, no need to do async fetch
3242
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
19,326,517✔
3243
      if (pResultInfo->localResultFetched) {
7,264,236!
3244
        pResultInfo->numOfRows = 0;
3,632,118✔
3245
        pResultInfo->current = 0;
3,632,118✔
3246
      } else {
3247
        pResultInfo->localResultFetched = true;
3,632,118✔
3248
      }
3249
    } else {
3250
      pResultInfo->numOfRows = 0;
12,062,281✔
3251
    }
3252

3253
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
19,326,517✔
3254
    return;
19,326,517✔
3255
  }
3256

3257
  SSchedulerReq req = {
592,997,864✔
3258
      .syncReq = false,
3259
      .fetchFp = fetchCallback,
3260
      .cbParam = pRequest,
3261
  };
3262

3263
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
592,998,537✔
3264
  if (TSDB_CODE_SUCCESS != code) {
592,998,537!
3265
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3266
    // pRequest->body.fetchFp(param, pRequest, code);
3267
  }
3268
}
3269

3270
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
2,147,483,647✔
3271
  pRequest->inCallback = true;
2,147,483,647✔
3272
  int64_t this = pRequest->self;
2,147,483,647✔
3273
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
2,147,483,647!
3274
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
375,450!
3275
    code = TSDB_CODE_SUCCESS;
×
3276
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3277
  }
3278

3279
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
2,147,483,647✔
3280
           pRequest);
3281

3282
  if (pRequest->body.queryFp != NULL) {
2,147,483,647✔
3283
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
2,147,483,647✔
3284
  }
3285

3286
  SRequestObj* pReq = acquireRequest(this);
2,147,483,647✔
3287
  if (pReq != NULL) {
2,147,483,647✔
3288
    pReq->inCallback = false;
2,147,483,647✔
3289
    (void)releaseRequest(this);
2,147,483,647✔
3290
  }
3291
}
2,147,483,647✔
3292

3293
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
1,586,742✔
3294
                       SParseSqlRes* pRes) {
3295
#ifndef TD_ENTERPRISE
3296
  return TSDB_CODE_SUCCESS;
3297
#else
3298
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
1,586,742✔
3299
#endif
3300
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc