• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4996

19 Mar 2026 02:16AM UTC coverage: 72.069% (+0.07%) from 71.996%
#4996

push

travis-ci

web-flow
feat: SQL firewall black/white list (#34798)

461 of 618 new or added lines in 4 files covered. (74.6%)

380 existing lines in 128 files now uncovered.

245359 of 340448 relevant lines covered (72.07%)

135732617.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.18
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "command.h"
22
#include "decimal.h"
23
#include "scheduler.h"
24
#include "tdatablock.h"
25
#include "tdataformat.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tmisce.h"
29
#include "tmsg.h"
30
#include "tmsgtype.h"
31
#include "tpagedbuf.h"
32
#include "tref.h"
33
#include "tsched.h"
34
#include "tversion.h"
35

36
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
37
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
38

39
void setQueryRequest(int64_t rId) {
631,147,276✔
40
  SRequestObj* pReq = acquireRequest(rId);
631,147,276✔
41
  if (pReq != NULL) {
631,188,260✔
42
    pReq->isQuery = true;
631,167,567✔
43
    (void)releaseRequest(rId);
631,167,569✔
44
  }
45
}
631,176,725✔
46

47
static bool stringLengthCheck(const char* str, size_t maxsize) {
166,303,692✔
48
  if (str == NULL) {
166,303,692✔
49
    return false;
×
50
  }
51

52
  size_t len = strlen(str);
166,303,692✔
53
  if (len <= 0 || len > maxsize) {
166,303,692✔
54
    return false;
4✔
55
  }
56

57
  return true;
166,305,365✔
58
}
59

60
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
82,901,606✔
61

62
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
82,902,192✔
63

64
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
500,654✔
65

66
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
82,903,681✔
67
  char key[512] = {0};
82,903,681✔
68
  if (user == NULL) {
82,903,681✔
69
    (void)snprintf(key, sizeof(key), "%s:%s:%d", auth, ip, port);
2,299✔
70
  } else {
71
    (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
82,901,382✔
72
  }
73
  return taosStrdup(key);
82,903,681✔
74
}
75

76
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
555,983✔
77
  if (dst == NULL || src == NULL || srcLength == 0) {
555,983✔
78
    return 0;
465✔
79
  }
80

81
  size_t escapeLength = 0;
555,518✔
82
  for (size_t i = 0; i < srcLength; ++i) {
15,792,334✔
83
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
15,236,816✔
84
        src[i] == '\t') {
15,236,816✔
85
      escapeLength += 1;
×
86
    }
87
  }
88

89
  size_t dstLength = srcLength;
555,518✔
90
  if (escapeLength == 0) {
555,518✔
91
    (void)memcpy(dst, src, srcLength);
555,518✔
92
  } else {
93
    dstLength = 0;
×
94
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
95
      switch (src[i]) {
×
96
        case '\"':
×
97
          dst[dstLength++] = '\\';
×
98
          dst[dstLength++] = '\"';
×
99
          break;
×
100
        case '\\':
×
101
          dst[dstLength++] = '\\';
×
102
          dst[dstLength++] = '\\';
×
103
          break;
×
104
        case '\b':
×
105
          dst[dstLength++] = '\\';
×
106
          dst[dstLength++] = 'b';
×
107
          break;
×
108
        case '\f':
×
109
          dst[dstLength++] = '\\';
×
110
          dst[dstLength++] = 'f';
×
111
          break;
×
112
        case '\n':
×
113
          dst[dstLength++] = '\\';
×
114
          dst[dstLength++] = 'n';
×
115
          break;
×
116
        case '\r':
×
117
          dst[dstLength++] = '\\';
×
118
          dst[dstLength++] = 'r';
×
119
          break;
×
120
        case '\t':
×
121
          dst[dstLength++] = '\\';
×
122
          dst[dstLength++] = 't';
×
123
          break;
×
124
        default:
×
125
          dst[dstLength++] = src[i];
×
126
      }
127
    }
128
  }
129

130
  return dstLength;
555,518✔
131
}
132

133
bool chkRequestKilled(void* param) {
2,147,483,647✔
134
  bool         killed = false;
2,147,483,647✔
135
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
136
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
137
    killed = true;
2,128✔
138
  }
139

140
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
141

142
  return killed;
2,147,483,647✔
143
}
144

145
void cleanupAppInfo() {
1,755,766✔
146
  taosHashCleanup(appInfo.pInstMap);
1,755,766✔
147
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,755,766✔
148
  tscInfo("cluster instance map cleaned");
1,755,766✔
149
}
1,755,766✔
150

151
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
152
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
153
                               STscObj** pTscObj);
154

155
int32_t taos_connect_by_auth(const char* ip, const char* user, const char* auth, const char* totp,
82,903,482✔
156
                                    const char* db, uint16_t port, int connType, STscObj** pObj) {
157
  TSC_ERR_RET(taos_init());
82,903,482✔
158

159
  if (user == NULL) {
82,903,949✔
160
    if (auth == NULL || strlen(auth) != (TSDB_TOKEN_LEN - 1)) {
3,020✔
161
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOKEN);
721✔
162
    }
163
  } else if (!validateUserName(user)) {
82,900,929✔
164
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
165
  }
166
  int32_t code = 0;
82,902,586✔
167

168
  char localDb[TSDB_DB_NAME_LEN] = {0};
82,902,586✔
169
  if (db != NULL && strlen(db) > 0) {
82,902,586✔
170
    if (!validateDbName(db)) {
500,882✔
171
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
172
    }
173

174
    tstrncpy(localDb, db, sizeof(localDb));
500,678✔
175
    (void)strdequote(localDb);
500,698✔
176
  }
177

178
  int32_t totpCode = -1;
82,901,885✔
179
  if (totp != NULL) {
82,901,885✔
180
    char* endptr = NULL;
2,336✔
181
    totpCode = taosStr2Int32(totp, &endptr, 10);
2,336✔
182
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
2,336✔
183
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
184
    }
185
  }
186

187
  SCorEpSet epSet = {0};
82,901,885✔
188
  if (ip) {
82,901,689✔
189
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
80,710,978✔
190
  } else {
191
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
2,190,711✔
192
  }
193

194
  if (port) {
82,902,947✔
195
    epSet.epSet.eps[0].port = port;
80,087,599✔
196
    epSet.epSet.eps[1].port = port;
80,087,599✔
197
  }
198

199
  char* key = getClusterKey(user, auth, ip, port);
82,902,947✔
200
  if (NULL == key) {
82,904,242✔
201
    TSC_ERR_RET(terrno);
×
202
  }
203
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
82,904,242✔
204
          user ? user : "", db, key);
205
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
168,000,747✔
206
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
85,095,638✔
207
  }
208

209
  SAppInstInfo** pInst = NULL;
82,905,109✔
210
  code = taosThreadMutexLock(&appInfo.mutex);
82,905,109✔
211
  if (TSDB_CODE_SUCCESS != code) {
82,905,046✔
212
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
213
    TSC_ERR_RET(code);
×
214
  }
215

216
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
82,905,046✔
217
  SAppInstInfo* p = NULL;
82,905,046✔
218
  if (pInst == NULL) {
82,905,046✔
219
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,829,026✔
220
    if (NULL == p) {
1,829,026✔
221
      TSC_ERR_JRET(terrno);
×
222
    }
223
    p->mgmtEp = epSet;
1,829,026✔
224
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,829,026✔
225
    if (TSDB_CODE_SUCCESS != code) {
1,829,026✔
226
      taosMemoryFree(p);
×
227
      TSC_ERR_JRET(code);
×
228
    }
229
    code = openTransporter(user, auth, tsNumOfCores / 2, &p->pTransporter);
1,829,026✔
230
    if (TSDB_CODE_SUCCESS != code) {
1,829,026✔
231
      taosMemoryFree(p);
×
232
      TSC_ERR_JRET(code);
×
233
    }
234
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,829,026✔
235
    if (TSDB_CODE_SUCCESS != code) {
1,829,026✔
236
      destroyAppInst(&p);
×
237
      TSC_ERR_JRET(code);
×
238
    }
239
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,829,026✔
240
    if (TSDB_CODE_SUCCESS != code) {
1,829,026✔
241
      destroyAppInst(&p);
×
242
      TSC_ERR_JRET(code);
×
243
    }
244
    p->instKey = key;
1,829,026✔
245
    key = NULL;
1,829,026✔
246
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user ? user : "", epSet.epSet.eps[0].fqdn,
1,829,026✔
247
            epSet.epSet.eps[0].port);
248

249
    pInst = &p;
1,829,026✔
250
  } else {
251
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
81,076,020✔
252
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
253
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
254
    }
255
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
256
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
81,076,020✔
257
  }
258

259
_return:
82,905,046✔
260

261
  if (TSDB_CODE_SUCCESS != code) {
82,905,046✔
262
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
263
    taosMemoryFreeClear(key);
×
264
    return code;
×
265
  } else {
266
    code = taosThreadMutexUnlock(&appInfo.mutex);
82,905,046✔
267
    taosMemoryFreeClear(key);
82,903,380✔
268
    if (TSDB_CODE_SUCCESS != code) {
82,905,046✔
269
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
270
      return code;
×
271
    }
272
    return taosConnectImpl(user, auth, totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
82,905,046✔
273
  }
274
}
275

276
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
82,900,759✔
277
                              uint16_t port, int connType, STscObj** pObj) {
278
  char auth[TSDB_PASSWORD_LEN + 1] = {0};
82,900,759✔
279
  if (!validatePassword(pass)) {
82,900,759✔
280
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
281
  }
282

283
  taosEncryptPass_c((uint8_t*)pass, strlen(pass), auth);
82,902,585✔
284
  return taos_connect_by_auth(ip, user, auth, totp, db, port, connType, pObj);
82,901,082✔
285
}
286

287
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
288
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
289
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
290
//     return *ppAppInstInfo;
291
//   } else {
292
//     return NULL;
293
//   }
294
// }
295

296
void freeQueryParam(SSyncQueryParam* param) {
503,802✔
297
  if (param == NULL) return;
503,802✔
298
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
503,802✔
299
    tscError("failed to destroy semaphore in freeQueryParam");
×
300
  }
301
  taosMemoryFree(param);
503,802✔
302
}
303

304
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
921,631,274✔
305
                     SRequestObj** pRequest, int64_t reqid) {
306
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
921,631,274✔
307
  if (TSDB_CODE_SUCCESS != code) {
921,636,460✔
308
    tscError("failed to malloc sqlObj, %s", sql);
×
309
    return code;
×
310
  }
311

312
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
921,636,460✔
313
  if ((*pRequest)->sqlstr == NULL) {
921,635,457✔
314
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
315
    destroyRequest(*pRequest);
×
316
    *pRequest = NULL;
×
317
    return terrno;
×
318
  }
319

320
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
921,642,678✔
321
  (*pRequest)->sqlstr[sqlLen] = 0;
921,653,871✔
322
  (*pRequest)->sqlLen = sqlLen;
921,653,463✔
323
  (*pRequest)->validateOnly = validateSql;
921,653,052✔
324
  (*pRequest)->stmtBindVersion = 0;
921,649,581✔
325

326
  code = sqlSecurityCheckStringLevel(*pRequest, (*pRequest)->sqlstr, (*pRequest)->sqlLen);
921,652,657✔
327
  if (code != TSDB_CODE_SUCCESS) {
921,632,854✔
328
    tscWarn("req:0x%" PRIx64 ", sql security string check failed, QID:0x%" PRIx64 ", code:%s", (*pRequest)->self,
1,328✔
329
            (*pRequest)->requestId, tstrerror(code));
330
    destroyRequest(*pRequest);
1,328✔
331
    *pRequest = NULL;
1,328✔
332
    return code;
1,328✔
333
  }
334

335
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
921,631,526✔
336

337
  STscObj* pTscObj = (*pRequest)->pTscObj;
921,644,025✔
338
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
921,640,694✔
339
                             sizeof((*pRequest)->self));
340
  if (err) {
921,641,288✔
341
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
342
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
343
    destroyRequest(*pRequest);
×
344
    *pRequest = NULL;
×
345
    return terrno;
×
346
  }
347

348
  (*pRequest)->allocatorRefId = -1;
921,641,288✔
349
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
921,646,200✔
350
    if (TSDB_CODE_SUCCESS !=
429,284,510✔
351
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
429,276,578✔
352
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
353
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
354
      destroyRequest(*pRequest);
×
355
      *pRequest = NULL;
×
356
      return terrno;
×
357
    }
358
  }
359

360
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
921,654,165✔
361
  return TSDB_CODE_SUCCESS;
921,640,076✔
362
}
363

364
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
365
  int32_t code =
366
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
367
  if (TSDB_CODE_SUCCESS == code) {
×
368
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
369
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
370
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
371
    (*pNewRequest)->isSubReq = true;
×
372
  }
373
  return code;
×
374
}
375

376
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
6,817,532✔
377
  STscObj* pTscObj = pRequest->pTscObj;
6,817,532✔
378

379
  SParseContext cxt = {
6,821,179✔
380
      .requestId = pRequest->requestId,
6,821,462✔
381
      .requestRid = pRequest->self,
6,819,051✔
382
      .acctId = pTscObj->acctId,
6,819,452✔
383
      .db = pRequest->pDb,
6,820,467✔
384
      .topicQuery = topicQuery,
385
      .pSql = pRequest->sqlstr,
6,819,791✔
386
      .sqlLen = pRequest->sqlLen,
6,819,819✔
387
      .pMsg = pRequest->msgBuf,
6,816,399✔
388
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
389
      .pTransporter = pTscObj->pAppInfo->pTransporter,
6,817,177✔
390
      .pStmtCb = pStmtCb,
391
      .pUser = pTscObj->user,
6,818,100✔
392
      .userId = pTscObj->userId,
6,817,099✔
393
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
6,816,166✔
394
      .enableSysInfo = pTscObj->sysInfo,
6,815,783✔
395
      .svrVer = pTscObj->sVer,
6,813,813✔
396
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
6,817,845✔
397
      .stmtBindVersion = pRequest->stmtBindVersion,
6,818,802✔
398
      .setQueryFp = setQueryRequest,
399
      .timezone = pTscObj->optionInfo.timezone,
6,816,539✔
400
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
6,816,743✔
401
  };
402

403
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
6,818,795✔
404
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
6,821,149✔
405
  if (code != TSDB_CODE_SUCCESS) {
6,817,138✔
406
    return code;
×
407
  }
408

409
  code = qParseSql(&cxt, pQuery);
6,817,138✔
410
  if (TSDB_CODE_SUCCESS == code) {
6,811,760✔
411
    if ((*pQuery)->haveResultSet) {
6,813,124✔
412
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
413
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
414
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
415
    }
416
  }
417

418
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
6,811,423✔
419
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
6,808,200✔
420
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
6,812,414✔
421
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
6,803,819✔
422
  }
423

424
  taosArrayDestroy(cxt.pTableMetaPos);
6,799,961✔
425
  taosArrayDestroy(cxt.pTableVgroupPos);
6,805,028✔
426

427
  return code;
6,806,693✔
428
}
429

430
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
431
  SRetrieveTableRsp* pRsp = NULL;
×
432
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
433
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
434
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
435
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
436
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
437
                                 pRequest->stmtBindVersion > 0);
×
438
  }
439

440
  return code;
×
441
}
442

443
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
368,681✔
444
  // drop table if exists not_exists_table
445
  if (NULL == pQuery->pCmdMsg) {
368,681✔
446
    return TSDB_CODE_SUCCESS;
×
447
  }
448

449
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
368,681✔
450
  pRequest->type = pMsgInfo->msgType;
368,681✔
451
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
368,681✔
452
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
368,681✔
453

454
  STscObj*      pTscObj = pRequest->pTscObj;
368,681✔
455
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
368,681✔
456

457
  // int64_t transporterId = 0;
458
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
368,681✔
459
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
368,681✔
460
  return TSDB_CODE_SUCCESS;
368,681✔
461
}
462

463
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
1,515,268,779✔
464

465
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
5,648,335✔
466
  SRetrieveTableRsp* pRsp = NULL;
5,648,335✔
467
  if (pRequest->validateOnly) {
5,648,335✔
468
    doRequestCallback(pRequest, 0);
10,071✔
469
    return;
10,071✔
470
  }
471

472
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
11,248,216✔
473
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
11,248,216✔
474
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
5,638,264✔
475
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
3,128,197✔
476
                                 pRequest->stmtBindVersion > 0);
3,128,197✔
477
  }
478

479
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
5,638,264✔
480
  pRequest->code = code;
5,638,264✔
481

482
  if (pRequest->code != TSDB_CODE_SUCCESS) {
5,638,264✔
483
    pResultInfo->numOfRows = 0;
1,653✔
484
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
1,653✔
485
             pRequest->requestId);
486
  } else {
487
    tscDebug(
5,636,611✔
488
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
489
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
490
  }
491

492
  doRequestCallback(pRequest, code);
5,638,264✔
493
}
494

495
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
95,912,577✔
496
  if (pRequest->validateOnly) {
95,912,577✔
497
    doRequestCallback(pRequest, 0);
×
498
    return TSDB_CODE_SUCCESS;
×
499
  }
500

501
  // drop table if exists not_exists_table
502
  if (NULL == pQuery->pCmdMsg) {
95,912,577✔
503
    doRequestCallback(pRequest, 0);
6,521✔
504
    return TSDB_CODE_SUCCESS;
6,521✔
505
  }
506

507
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
95,906,023✔
508
  pRequest->type = pMsgInfo->msgType;
95,905,429✔
509
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
95,906,669✔
510
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
95,906,012✔
511

512
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
95,906,570✔
513
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
95,905,053✔
514

515
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
95,911,470✔
516
  if (code) {
95,916,600✔
517
    doRequestCallback(pRequest, code);
×
518
  }
519
  return code;
95,910,007✔
520
}
521

522
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
123,454✔
523
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
123,454✔
524
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
123,454✔
525

526
  if (node1->load < node2->load) {
123,454✔
527
    return -1;
×
528
  }
529

530
  return node1->load > node2->load;
123,454✔
531
}
532

533
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
320,095✔
534
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
320,095✔
535
  if (pInfo->pQnodeList) {
320,095✔
536
    taosArrayDestroy(pInfo->pQnodeList);
315,892✔
537
    pInfo->pQnodeList = NULL;
315,892✔
538
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
315,892✔
539
  }
540

541
  if (pNodeList) {
320,095✔
542
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
320,095✔
543
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
320,095✔
544
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
320,095✔
545
             taosArrayGetSize(pInfo->pQnodeList));
546
  }
547
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
320,095✔
548

549
  return TSDB_CODE_SUCCESS;
320,095✔
550
}
551

552
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
915,676,156✔
553
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
915,676,156✔
554
    *required = false;
903,798,817✔
555
    return TSDB_CODE_SUCCESS;
903,790,152✔
556
  }
557

558
  int32_t       code = TSDB_CODE_SUCCESS;
11,877,339✔
559
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
11,877,339✔
560
  *required = false;
11,877,826✔
561

562
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
11,877,826✔
563
  *required = (NULL == pInfo->pQnodeList);
11,877,826✔
564
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
11,878,313✔
565
  return TSDB_CODE_SUCCESS;
11,877,826✔
566
}
567

568
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
569
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
570
  int32_t       code = 0;
×
571

572
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
573
  if (pInfo->pQnodeList) {
×
574
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
575
  }
576
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
577
  if (NULL == *pNodeList) {
×
578
    SCatalog* pCatalog = NULL;
×
579
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
580
    if (TSDB_CODE_SUCCESS == code) {
×
581
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
582
      if (NULL == pNodeList) {
×
583
        TSC_ERR_RET(terrno);
×
584
      }
585
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
586
                               .requestId = pRequest->requestId,
×
587
                               .requestObjRefId = pRequest->self,
×
588
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
589
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
590
    }
591

592
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
593
      code = updateQnodeList(pInfo, *pNodeList);
×
594
    }
595
  }
596

597
  return code;
×
598
}
599

600
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
7,878,422✔
601
  pRequest->type = pQuery->msgType;
7,878,422✔
602
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
7,875,442✔
603

604
  SPlanContext cxt = {.queryId = pRequest->requestId,
8,195,572✔
605
                      .acctId = pRequest->pTscObj->acctId,
7,872,398✔
606
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
7,875,749✔
607
                      .pAstRoot = pQuery->pRoot,
7,884,746✔
608
                      .showRewrite = pQuery->showRewrite,
7,886,785✔
609
                      .pMsg = pRequest->msgBuf,
7,878,781✔
610
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
611
                      .pUser = pRequest->pTscObj->user,
7,879,154✔
612
                      .userId = pRequest->pTscObj->userId,
7,866,817✔
613
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
7,868,815✔
614
                      .sysInfo = pRequest->pTscObj->sysInfo};
7,871,151✔
615

616
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
7,866,492✔
617
}
618

619
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
240,335,758✔
620
                         const SExtSchema* pExtSchema, bool isStmt) {
621
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
240,335,758✔
UNCOV
622
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
623
    return TSDB_CODE_INVALID_PARA;
×
624
  }
625

626
  pResInfo->numOfCols = numOfCols;
240,355,227✔
627
  if (pResInfo->fields != NULL) {
240,354,977✔
628
    taosMemoryFree(pResInfo->fields);
14,832✔
629
  }
630
  if (pResInfo->userFields != NULL) {
240,352,644✔
631
    taosMemoryFree(pResInfo->userFields);
14,832✔
632
  }
633
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
240,349,514✔
634
  if (NULL == pResInfo->fields) return terrno;
240,348,639✔
635
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
240,350,687✔
636
  if (NULL == pResInfo->userFields) {
240,340,506✔
637
    taosMemoryFree(pResInfo->fields);
×
638
    return terrno;
×
639
  }
640
  if (numOfCols != pResInfo->numOfCols) {
240,339,954✔
641
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
642
    return TSDB_CODE_FAILED;
×
643
  }
644

645
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
1,186,510,554✔
646
    pResInfo->fields[i].type = pSchema[i].type;
946,152,267✔
647

648
    pResInfo->userFields[i].type = pSchema[i].type;
946,154,024✔
649
    // userFields must convert to type bytes, no matter isStmt or not
650
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
946,155,695✔
651
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
946,155,867✔
652
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
946,158,363✔
653
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,750,029✔
654
    }
655

656
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
946,166,477✔
657
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
946,169,085✔
658
  }
659
  return TSDB_CODE_SUCCESS;
240,362,471✔
660
}
661

662
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
187,942,387✔
663
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
187,942,387✔
664
      precision != TSDB_TIME_PRECISION_NANO) {
665
    return;
×
666
  }
667

668
  pResInfo->precision = precision;
187,942,387✔
669
}
670

671
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
191,971,820✔
672
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
191,971,820✔
673
  if (NULL == nodeList) {
191,973,748✔
674
    return terrno;
×
675
  }
676
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
191,982,789✔
677

678
  int32_t dbNum = taosArrayGetSize(pDbVgList);
191,982,789✔
679
  for (int32_t i = 0; i < dbNum; ++i) {
381,817,047✔
680
    SArray* pVg = taosArrayGetP(pDbVgList, i);
189,808,891✔
681
    if (NULL == pVg) {
189,814,872✔
682
      continue;
×
683
    }
684
    int32_t vgNum = taosArrayGetSize(pVg);
189,814,872✔
685
    if (vgNum <= 0) {
189,808,492✔
686
      continue;
559,506✔
687
    }
688

689
    for (int32_t j = 0; j < vgNum; ++j) {
647,729,781✔
690
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
458,467,140✔
691
      if (NULL == pInfo) {
458,486,967✔
692
        taosArrayDestroy(nodeList);
×
693
        return TSDB_CODE_OUT_OF_RANGE;
×
694
      }
695
      SQueryNodeLoad load = {0};
458,486,967✔
696
      load.addr.nodeId = pInfo->vgId;
458,504,085✔
697
      load.addr.epSet = pInfo->epSet;
458,501,929✔
698

699
      if (NULL == taosArrayPush(nodeList, &load)) {
458,382,165✔
700
        taosArrayDestroy(nodeList);
×
701
        return terrno;
×
702
      }
703
    }
704
  }
705

706
  int32_t vnodeNum = taosArrayGetSize(nodeList);
192,008,156✔
707
  if (vnodeNum > 0) {
192,010,723✔
708
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
188,969,399✔
709
    goto _return;
188,960,301✔
710
  }
711

712
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
3,041,324✔
713
  if (mnodeNum <= 0) {
3,041,591✔
714
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
715
    goto _return;
×
716
  }
717

718
  void* pData = taosArrayGet(pMnodeList, 0);
3,041,591✔
719
  if (NULL == pData) {
3,041,591✔
720
    taosArrayDestroy(nodeList);
×
721
    return TSDB_CODE_OUT_OF_RANGE;
×
722
  }
723
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
3,041,591✔
724
    taosArrayDestroy(nodeList);
×
725
    return terrno;
×
726
  }
727

728
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
3,041,591✔
729

730
_return:
25,284✔
731

732
  *pNodeList = nodeList;
191,994,359✔
733

734
  return TSDB_CODE_SUCCESS;
191,994,055✔
735
}
736

737
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
1,499,615✔
738
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,499,615✔
739
  if (NULL == nodeList) {
1,499,615✔
740
    return terrno;
×
741
  }
742

743
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
1,499,615✔
744
  if (qNodeNum > 0) {
1,499,615✔
745
    void* pData = taosArrayGet(pQnodeList, 0);
1,480,967✔
746
    if (NULL == pData) {
1,480,967✔
747
      taosArrayDestroy(nodeList);
×
748
      return TSDB_CODE_OUT_OF_RANGE;
×
749
    }
750
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
1,480,967✔
751
      taosArrayDestroy(nodeList);
×
752
      return terrno;
×
753
    }
754
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
1,480,967✔
755
    goto _return;
1,480,967✔
756
  }
757

758
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
18,648✔
759
  if (mnodeNum <= 0) {
18,648✔
760
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
×
761
    goto _return;
×
762
  }
763

764
  void* pData = taosArrayGet(pMnodeList, 0);
18,648✔
765
  if (NULL == pData) {
18,648✔
766
    taosArrayDestroy(nodeList);
×
767
    return TSDB_CODE_OUT_OF_RANGE;
×
768
  }
769
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
18,648✔
770
    taosArrayDestroy(nodeList);
×
771
    return terrno;
×
772
  }
773

774
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
18,648✔
775

776
_return:
×
777

778
  *pNodeList = nodeList;
1,499,615✔
779

780
  return TSDB_CODE_SUCCESS;
1,499,615✔
781
}
782

783
void freeVgList(void* list) {
7,847,173✔
784
  SArray* pList = *(SArray**)list;
7,847,173✔
785
  taosArrayDestroy(pList);
7,852,595✔
786
}
7,832,113✔
787

788
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
185,595,334✔
789
  SArray* pDbVgList = NULL;
185,595,334✔
790
  SArray* pQnodeList = NULL;
185,595,334✔
791
  FDelete fp = NULL;
185,595,334✔
792
  int32_t code = 0;
185,595,334✔
793

794
  switch (tsQueryPolicy) {
185,595,334✔
795
    case QUERY_POLICY_VNODE:
184,106,519✔
796
    case QUERY_POLICY_CLIENT: {
797
      if (pResultMeta) {
184,106,519✔
798
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
184,114,063✔
799
        if (NULL == pDbVgList) {
184,110,723✔
800
          code = terrno;
×
801
          goto _return;
×
802
        }
803
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
184,110,723✔
804
        for (int32_t i = 0; i < dbNum; ++i) {
366,063,470✔
805
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
181,948,501✔
806
          if (pRes->code || NULL == pRes->pRes) {
181,955,041✔
807
            continue;
544✔
808
          }
809

810
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
363,914,445✔
811
            code = terrno;
×
812
            goto _return;
×
813
          }
814
        }
815
      } else {
816
        fp = freeVgList;
1,420✔
817

818
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
1,420✔
819
        if (dbNum > 0) {
864✔
820
          SCatalog*     pCtg = NULL;
864✔
821
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
864✔
822
          code = catalogGetHandle(pInst->clusterId, &pCtg);
864✔
823
          if (code != TSDB_CODE_SUCCESS) {
864✔
824
            goto _return;
×
825
          }
826

827
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
864✔
828
          if (NULL == pDbVgList) {
864✔
829
            code = terrno;
×
830
            goto _return;
×
831
          }
832
          SArray* pVgList = NULL;
864✔
833
          for (int32_t i = 0; i < dbNum; ++i) {
1,728✔
834
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
864✔
835
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
864✔
836
                                     .requestId = pRequest->requestId,
864✔
837
                                     .requestObjRefId = pRequest->self,
864✔
838
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
864✔
839

840
            // catalogGetDBVgList will handle dbFName == null.
841
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
864✔
842
            if (code) {
864✔
843
              goto _return;
×
844
            }
845

846
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
864✔
847
              code = terrno;
×
848
              goto _return;
×
849
            }
850
          }
851
        }
852
      }
853

854
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
184,115,833✔
855
      break;
184,109,731✔
856
    }
857
    case QUERY_POLICY_HYBRID:
1,499,615✔
858
    case QUERY_POLICY_QNODE: {
859
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
1,520,211✔
860
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
20,596✔
861
        if (pRes->code) {
20,596✔
862
          pQnodeList = NULL;
×
863
        } else {
864
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
20,596✔
865
          if (NULL == pQnodeList) {
20,596✔
866
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
867
            goto _return;
×
868
          }
869
        }
870
      } else {
871
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,479,019✔
872
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
1,479,019✔
873
        if (pInst->pQnodeList) {
1,479,019✔
874
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
1,479,019✔
875
          if (NULL == pQnodeList) {
1,479,019✔
876
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
877
            goto _return;
×
878
          }
879
        }
880
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
1,479,019✔
881
      }
882

883
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
1,499,615✔
884
      break;
1,499,615✔
885
    }
886
    default:
110✔
887
      tscError("unknown query policy: %d", tsQueryPolicy);
110✔
888
      return TSDB_CODE_APP_ERROR;
×
889
  }
890

891
_return:
185,609,346✔
892
  taosArrayDestroyEx(pDbVgList, fp);
185,609,346✔
893
  taosArrayDestroy(pQnodeList);
185,614,974✔
894

895
  return code;
185,618,159✔
896
}
897

898
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
7,868,769✔
899
  SArray* pDbVgList = NULL;
7,868,769✔
900
  SArray* pQnodeList = NULL;
7,868,769✔
901
  int32_t code = 0;
7,874,486✔
902

903
  switch (tsQueryPolicy) {
7,874,486✔
904
    case QUERY_POLICY_VNODE:
7,859,838✔
905
    case QUERY_POLICY_CLIENT: {
906
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
7,859,838✔
907
      if (dbNum > 0) {
7,875,553✔
908
        SCatalog*     pCtg = NULL;
7,847,540✔
909
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
7,848,202✔
910
        code = catalogGetHandle(pInst->clusterId, &pCtg);
7,844,480✔
911
        if (code != TSDB_CODE_SUCCESS) {
7,843,778✔
912
          goto _return;
×
913
        }
914

915
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
7,843,778✔
916
        if (NULL == pDbVgList) {
7,846,811✔
UNCOV
917
          code = terrno;
×
918
          goto _return;
×
919
        }
920
        SArray* pVgList = NULL;
7,846,811✔
921
        for (int32_t i = 0; i < dbNum; ++i) {
15,689,488✔
922
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
7,842,129✔
923
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
7,855,805✔
924
                                   .requestId = pRequest->requestId,
7,850,613✔
925
                                   .requestObjRefId = pRequest->self,
7,847,754✔
926
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
7,850,767✔
927

928
          // catalogGetDBVgList will handle dbFName == null.
929
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
7,860,182✔
930
          if (code) {
7,854,712✔
931
            goto _return;
×
932
          }
933

934
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
7,859,685✔
935
            code = terrno;
×
936
            goto _return;
×
937
          }
938
        }
939
      }
940

941
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
7,880,049✔
942
      break;
7,878,979✔
943
    }
944
    case QUERY_POLICY_HYBRID:
×
945
    case QUERY_POLICY_QNODE: {
946
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
947

948
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
949
      break;
×
950
    }
951
    default:
14,648✔
952
      tscError("unknown query policy: %d", tsQueryPolicy);
14,648✔
953
      return TSDB_CODE_APP_ERROR;
×
954
  }
955

956
_return:
7,874,664✔
957

958
  taosArrayDestroyEx(pDbVgList, freeVgList);
7,879,328✔
959
  taosArrayDestroy(pQnodeList);
7,870,367✔
960

961
  return code;
7,875,344✔
962
}
963

964
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
7,874,706✔
965
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
7,874,706✔
966

967
  SExecResult      res = {0};
7,882,752✔
968
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
7,878,454✔
969
                           .requestId = pRequest->requestId,
7,872,124✔
970
                           .requestObjRefId = pRequest->self};
7,873,643✔
971
  SSchedulerReq    req = {
8,205,727✔
972
         .syncReq       = true,
973
         .localReq      = (tsQueryPolicy == QUERY_POLICY_CLIENT),
7,863,778✔
974
         .pConn         = &conn,
975
         .pNodeList     = pNodeList,
976
         .pDag          = pDag,
977
         .sql           = pRequest->sqlstr,
7,863,778✔
978
         .startTs       = pRequest->metric.start,
7,862,075✔
979
         .execFp        = NULL,
980
         .cbParam       = NULL,
981
         .chkKillFp     = chkRequestKilled,
982
         .chkKillParam  = (void*)pRequest->self,
7,852,755✔
983
         .pExecRes      = &res,
984
         .source        = pRequest->source,
7,874,441✔
985
         .secureDelete  = pRequest->secureDelete,
7,878,036✔
986
         .pWorkerCb     = getTaskPoolWorkerCb(),
7,875,410✔
987
  };
988

989
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
7,871,078✔
990

991
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
7,889,086✔
992
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
7,889,928✔
993

994
  if (code != TSDB_CODE_SUCCESS) {
7,888,256✔
995
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
996

997
    pRequest->code = code;
×
998
    terrno = code;
×
999
    return pRequest->code;
×
1000
  }
1001

1002
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
7,888,256✔
1003
      TDMT_VND_CREATE_TABLE == pRequest->type) {
25,872✔
1004
    pRequest->body.resInfo.numOfRows = res.numOfRows;
7,872,498✔
1005
    if (TDMT_VND_SUBMIT == pRequest->type) {
7,872,829✔
1006
      STscObj*            pTscObj = pRequest->pTscObj;
7,861,766✔
1007
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
7,861,075✔
1008
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
7,864,816✔
1009
    }
1010

1011
    schedulerFreeJob(&pRequest->body.queryJob, 0);
7,870,594✔
1012
  }
1013

1014
  pRequest->code = res.code;
7,890,431✔
1015
  terrno = res.code;
7,890,059✔
1016
  return pRequest->code;
7,885,637✔
1017
}
1018

1019
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
492,145,215✔
1020
  SArray*      pArray = NULL;
492,145,215✔
1021
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
492,145,215✔
1022
  if (NULL == pRsp->aCreateTbRsp) {
492,145,215✔
1023
    return TSDB_CODE_SUCCESS;
480,170,741✔
1024
  }
1025

1026
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
11,982,345✔
1027
  for (int32_t i = 0; i < tbNum; ++i) {
26,912,387✔
1028
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
14,929,680✔
1029
    if (pTbRsp->pMeta) {
14,927,345✔
1030
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
14,664,025✔
1031
    }
1032
  }
1033

1034
  return TSDB_CODE_SUCCESS;
11,982,707✔
1035
}
1036

1037
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
147,677,511✔
1038
  int32_t code = 0;
147,677,511✔
1039
  SArray* pArray = NULL;
147,677,511✔
1040
  SArray* pTbArray = (SArray*)res;
147,677,511✔
1041
  int32_t tbNum = taosArrayGetSize(pTbArray);
147,677,511✔
1042
  if (tbNum <= 0) {
147,678,394✔
1043
    return TSDB_CODE_SUCCESS;
×
1044
  }
1045

1046
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
147,678,394✔
1047
  if (NULL == pArray) {
147,679,027✔
UNCOV
1048
    return terrno;
×
1049
  }
1050

1051
  for (int32_t i = 0; i < tbNum; ++i) {
477,586,440✔
1052
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
329,906,474✔
1053
    if (NULL == tbInfo) {
329,906,624✔
1054
      code = terrno;
×
1055
      goto _return;
×
1056
    }
1057
    STbSVersion tbSver = {
329,906,624✔
1058
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
329,906,599✔
1059
    if (NULL == taosArrayPush(pArray, &tbSver)) {
329,907,429✔
1060
      code = terrno;
×
1061
      goto _return;
×
1062
    }
1063
  }
1064

1065
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
147,679,966✔
1066
                           .requestId = pRequest->requestId,
147,680,554✔
1067
                           .requestObjRefId = pRequest->self,
147,680,498✔
1068
                           .mgmtEps = *epset};
1069

1070
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
147,680,023✔
1071

1072
_return:
147,678,388✔
1073

1074
  taosArrayDestroy(pArray);
147,678,329✔
1075
  return code;
147,679,447✔
1076
}
1077

1078
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
8,361,429✔
1079
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
8,361,429✔
1080
}
1081

1082
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
64,313,398✔
1083
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
64,313,398✔
1084
}
1085

1086
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
734,763,813✔
1087
  if (NULL == pRequest->body.resInfo.execRes.res) {
734,763,813✔
1088
    return pRequest->code;
49,844,615✔
1089
  }
1090

1091
  SCatalog*     pCatalog = NULL;
684,908,126✔
1092
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
684,910,911✔
1093

1094
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
684,936,067✔
1095
  if (code) {
684,920,221✔
1096
    return code;
×
1097
  }
1098

1099
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
684,920,221✔
1100
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
684,937,360✔
1101

1102
  switch (pRes->msgType) {
684,929,455✔
1103
    case TDMT_VND_ALTER_TABLE:
3,801,603✔
1104
    case TDMT_MND_ALTER_STB: {
1105
      code = handleAlterTbExecRes(pRes->res, pCatalog);
3,801,603✔
1106
      break;
3,801,603✔
1107
    }
1108
    case TDMT_VND_CREATE_TABLE: {
40,922,391✔
1109
      SArray* pList = (SArray*)pRes->res;
40,922,391✔
1110
      int32_t num = taosArrayGetSize(pList);
40,933,435✔
1111
      for (int32_t i = 0; i < num; ++i) {
88,766,881✔
1112
        void* res = taosArrayGetP(pList, i);
47,826,544✔
1113
        // handleCreateTbExecRes will handle res == null
1114
        code = handleCreateTbExecRes(res, pCatalog);
47,828,004✔
1115
      }
1116
      break;
40,940,337✔
1117
    }
1118
    case TDMT_MND_CREATE_STB: {
364,971✔
1119
      code = handleCreateTbExecRes(pRes->res, pCatalog);
364,971✔
1120
      break;
364,971✔
1121
    }
1122
    case TDMT_VND_SUBMIT: {
492,143,547✔
1123
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
492,143,547✔
1124

1125
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
492,156,027✔
1126
      break;
492,146,545✔
1127
    }
1128
    case TDMT_SCH_QUERY:
147,678,321✔
1129
    case TDMT_SCH_MERGE_QUERY: {
1130
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
147,678,321✔
1131
      break;
147,680,444✔
1132
    }
1133
    default:
207✔
1134
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
207✔
1135
               pRequest->type, pRequest->requestId);
1136
      code = TSDB_CODE_APP_ERROR;
×
1137
  }
1138

1139
  return code;
684,933,900✔
1140
}
1141

1142
static bool incompletaFileParsing(SNode* pStmt) {
714,506,714✔
1143
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
714,506,714✔
1144
}
1145

1146
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1147
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1148

1149
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1150
  if (TSDB_CODE_SUCCESS == code) {
×
1151
    int64_t analyseStart = taosGetTimestampUs();
×
1152
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1153
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1154
  }
1155

1156
  if (TSDB_CODE_SUCCESS == code) {
×
1157
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1158
  }
1159

1160
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1161
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1162
}
×
1163

1164
void returnToUser(SRequestObj* pRequest) {
69,753,799✔
1165
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
69,753,799✔
1166
    // return to client
1167
    doRequestCallback(pRequest, pRequest->code);
69,753,799✔
1168
    return;
69,752,769✔
1169
  }
1170

1171
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1172
  if (pUserReq) {
×
1173
    pUserReq->code = pRequest->code;
×
1174
    // return to client
1175
    doRequestCallback(pUserReq, pUserReq->code);
×
1176
    (void)releaseRequest(pRequest->relation.userRefId);
×
1177
    return;
×
1178
  } else {
1179
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1180
             pRequest->relation.userRefId, pRequest->requestId);
1181
  }
1182
}
1183

1184
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1185
  int64_t     lastTs = 0;
×
1186
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1187
  int32_t     numOfFields = taos_num_fields(pRes);
×
1188

1189
  int32_t code = createDataBlock(pBlock);
×
1190
  if (code) {
×
1191
    return code;
×
1192
  }
1193

1194
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1195
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1196
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1197
    if (TSDB_CODE_SUCCESS != code) {
×
1198
      blockDataDestroy(*pBlock);
×
1199
      return code;
×
1200
    }
1201
  }
1202

1203
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1204
  if (TSDB_CODE_SUCCESS != code) {
×
1205
    blockDataDestroy(*pBlock);
×
1206
    return code;
×
1207
  }
1208

1209
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1210
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1211
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1212
      tscError("invalid data from vnode");
×
1213
      blockDataDestroy(*pBlock);
×
1214
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1215
    }
1216
    int64_t ts = *(int64_t*)pRow[0];
×
1217
    if (lastTs < ts) {
×
1218
      lastTs = ts;
×
1219
    }
1220

1221
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1222
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1223
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1224
      if (TSDB_CODE_SUCCESS != code) {
×
1225
        blockDataDestroy(*pBlock);
×
1226
        return code;
×
1227
      }
1228
    }
1229

1230
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1231
            *(int64_t*)pRow[2]);
1232
  }
1233

1234
  (*pBlock)->info.window.ekey = lastTs;
×
1235
  (*pBlock)->info.rows = numOfRows;
×
1236

1237
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1238
  return TSDB_CODE_SUCCESS;
×
1239
}
1240

1241
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1242
  SRequestObj* pRequest = (SRequestObj*)res;
×
1243
  if (pRequest->code) {
×
1244
    returnToUser(pRequest);
×
1245
    return;
×
1246
  }
1247

1248
  SSDataBlock* pBlock = NULL;
×
1249
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1250
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1251
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1252
             tstrerror(pRequest->code));
1253
    returnToUser(pRequest);
×
1254
    return;
×
1255
  }
1256

1257
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1258
  if (pNextReq) {
×
1259
    continuePostSubQuery(pNextReq, pBlock);
×
1260
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1261
  } else {
1262
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1263
             pRequest->relation.nextRefId, pRequest->requestId);
1264
  }
1265

1266
  blockDataDestroy(pBlock);
×
1267
}
1268

1269
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1270
  SRequestObj* pRequest = pWrapper->pRequest;
×
1271
  if (TD_RES_QUERY(pRequest)) {
×
1272
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1273
    return;
×
1274
  }
1275

1276
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1277
  if (pNextReq) {
×
1278
    continuePostSubQuery(pNextReq, NULL);
×
1279
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1280
  } else {
1281
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1282
             pRequest->relation.nextRefId, pRequest->requestId);
1283
  }
1284
}
1285

1286
// todo refacto the error code  mgmt
1287
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
726,605,248✔
1288
  SSqlCallbackWrapper* pWrapper = param;
726,605,248✔
1289
  SRequestObj*         pRequest = pWrapper->pRequest;
726,605,248✔
1290
  STscObj*             pTscObj = pRequest->pTscObj;
726,615,068✔
1291

1292
  pRequest->code = code;
726,612,911✔
1293
  if (pResult) {
726,617,063✔
1294
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
726,543,774✔
1295
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
726,550,009✔
1296
  }
1297

1298
  int32_t type = pRequest->type;
726,616,931✔
1299
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
726,593,885✔
1300
    if (pResult) {
528,915,471✔
1301
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
528,875,416✔
1302

1303
      // record the insert rows
1304
      if (TDMT_VND_SUBMIT == type) {
528,880,379✔
1305
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
484,682,354✔
1306
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
484,689,531✔
1307
      }
1308
    }
1309
    schedulerFreeJob(&pRequest->body.queryJob, 0);
528,929,532✔
1310
  }
1311

1312
  taosMemoryFree(pResult);
726,616,527✔
1313
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
726,605,860✔
1314
           pRequest->requestId);
1315

1316
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
726,604,949✔
1317
      pRequest->stmtBindVersion == 0) {
90,667✔
1318
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
90,571✔
1319
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1320
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
90,571✔
1321
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1322
    }
1323
    restartAsyncQuery(pRequest, code);
90,571✔
1324
    return;
90,571✔
1325
  }
1326

1327
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
726,514,378✔
1328
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
726,514,378✔
1329
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
3,336,751✔
1330
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1331
    }
1332
  }
1333

1334
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
726,525,197✔
1335
  int32_t code1 = handleQueryExecRsp(pRequest);
726,514,462✔
1336
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
726,523,474✔
1337
    pRequest->code = code1;
×
1338
  }
1339

1340
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,441,029,474✔
1341
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
714,502,753✔
1342
    continueInsertFromCsv(pWrapper, pRequest);
11,965✔
1343
    return;
11,965✔
1344
  }
1345

1346
  if (pRequest->relation.nextRefId) {
726,519,915✔
1347
    handlePostSubQuery(pWrapper);
×
1348
  } else {
1349
    destorySqlCallbackWrapper(pWrapper);
726,515,719✔
1350
    pRequest->pWrapper = NULL;
726,510,161✔
1351

1352
    // return to client
1353
    doRequestCallback(pRequest, code);
726,511,823✔
1354
  }
1355
}
1356

1357
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
8,235,865✔
1358
  int32_t code = 0;
8,235,865✔
1359
  int32_t subplanNum = 0;
8,235,865✔
1360

1361
  if (pQuery->pRoot) {
8,235,865✔
1362
    pRequest->stmtType = pQuery->pRoot->type;
7,883,054✔
1363
    if (nodeType(pQuery->pRoot) == QUERY_NODE_DELETE_STMT) {
7,880,344✔
1364
      pRequest->secureDelete = ((SDeleteStmt*)pQuery->pRoot)->secureDelete;
×
1365
    }
1366
  }
1367

1368
  if (pQuery->pRoot && !pRequest->inRetry) {
8,237,930✔
1369
    STscObj*            pTscObj = pRequest->pTscObj;
7,872,691✔
1370
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
7,875,692✔
1371
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
7,883,744✔
1372
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
7,856,557✔
1373
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
12,272✔
1374
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
12,279✔
1375
    }
1376
  }
1377

1378
  pRequest->body.execMode = pQuery->execMode;
8,257,973✔
1379
  switch (pQuery->execMode) {
8,247,061✔
1380
    case QUERY_EXEC_MODE_LOCAL:
×
1381
      if (!pRequest->validateOnly) {
×
1382
        if (NULL == pQuery->pRoot) {
×
1383
          terrno = TSDB_CODE_INVALID_PARA;
×
1384
          code = terrno;
×
1385
        } else {
1386
          code = execLocalCmd(pRequest, pQuery);
×
1387
        }
1388
      }
1389
      break;
×
1390
    case QUERY_EXEC_MODE_RPC:
368,668✔
1391
      if (!pRequest->validateOnly) {
368,668✔
1392
        code = execDdlQuery(pRequest, pQuery);
368,668✔
1393
      }
1394
      break;
368,681✔
1395
    case QUERY_EXEC_MODE_SCHEDULE: {
7,854,518✔
1396
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
7,854,518✔
1397
      if (NULL == pMnodeList) {
7,866,338✔
1398
        code = terrno;
×
1399
        break;
×
1400
      }
1401
      SQueryPlan* pDag = NULL;
7,866,338✔
1402
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
7,866,338✔
1403
      if (TSDB_CODE_SUCCESS == code) {
7,872,431✔
1404
        pRequest->body.subplanNum = pDag->numOfSubplans;
7,876,483✔
1405
        if (!pRequest->validateOnly) {
7,879,634✔
1406
          SArray* pNodeList = NULL;
7,858,871✔
1407
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
7,860,242✔
1408

1409
          if (TSDB_CODE_SUCCESS == code) {
7,872,317✔
1410
            code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
7,877,371✔
1411
                                        taosArrayGetSize(pNodeList));
7,869,325✔
1412
          }
1413

1414
          if (TSDB_CODE_SUCCESS == code) {
7,870,442✔
1415
            code = scheduleQuery(pRequest, pDag, pNodeList);
7,870,442✔
1416
          }
1417
          taosArrayDestroy(pNodeList);
7,884,182✔
1418
        }
1419
      }
1420
      taosArrayDestroy(pMnodeList);
7,878,925✔
1421
      break;
7,886,040✔
1422
    }
1423
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1424
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1425
      break;
×
1426
    default:
×
1427
      break;
×
1428
  }
1429

1430
  if (!keepQuery) {
8,255,052✔
1431
    qDestroyQuery(pQuery);
×
1432
  }
1433

1434
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
8,255,052✔
1435
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
3,710✔
1436
    if (TSDB_CODE_SUCCESS != ret) {
3,710✔
1437
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1438
               pRequest->requestId);
1439
    }
1440
  }
1441

1442
  if (TSDB_CODE_SUCCESS == code) {
8,253,371✔
1443
    code = handleQueryExecRsp(pRequest);
8,252,575✔
1444
  }
1445

1446
  if (TSDB_CODE_SUCCESS != code) {
8,257,602✔
1447
    pRequest->code = code;
3,406✔
1448
  }
1449

1450
  if (res) {
8,257,602✔
1451
    *res = pRequest->body.resInfo.execRes.res;
×
1452
    pRequest->body.resInfo.execRes.res = NULL;
×
1453
  }
1454
}
8,257,602✔
1455

1456
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
726,993,968✔
1457
                                 SSqlCallbackWrapper* pWrapper) {
1458
  int32_t code = TSDB_CODE_SUCCESS;
726,993,968✔
1459
  pRequest->type = pQuery->msgType;
726,993,968✔
1460
  SArray*     pMnodeList = NULL;
726,983,011✔
1461
  SArray*     pNodeList = NULL;
726,983,011✔
1462
  SQueryPlan* pDag = NULL;
726,971,870✔
1463
  int64_t     st = taosGetTimestampUs();
726,993,124✔
1464

1465
  if (!pRequest->parseOnly) {
726,993,124✔
1466
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
727,009,931✔
1467
    if (NULL == pMnodeList) {
726,991,089✔
1468
      code = terrno;
×
1469
    }
1470
    SPlanContext cxt = {.queryId = pRequest->requestId,
808,125,183✔
1471
                        .acctId = pRequest->pTscObj->acctId,
727,033,969✔
1472
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
727,032,514✔
1473
                        .pAstRoot = pQuery->pRoot,
727,041,967✔
1474
                        .showRewrite = pQuery->showRewrite,
727,044,325✔
1475
                        .isView = pWrapper->pParseCtx->isView,
727,026,915✔
1476
                        .isAudit = pWrapper->pParseCtx->isAudit,
727,029,067✔
1477
                        .privInfo = pWrapper->pParseCtx->privInfo,
727,002,839✔
1478
                        .pMsg = pRequest->msgBuf,
727,018,741✔
1479
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1480
                        .pUser = pRequest->pTscObj->user,
727,010,839✔
1481
                        .userId = pRequest->pTscObj->userId,
727,018,982✔
1482
                        .sysInfo = pRequest->pTscObj->sysInfo,
727,033,160✔
1483
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
727,031,204✔
1484
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
727,023,917✔
1485
    if (TSDB_CODE_SUCCESS == code) {
727,021,757✔
1486
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
727,006,930✔
1487
    }
1488
    if (code) {
727,013,125✔
1489
      tscError("req:0x%" PRIx64 " requestId:0x%" PRIx64 ", failed to create query plan, code:%s msg:%s", pRequest->self,
238,016✔
1490
               pRequest->requestId, tstrerror(code), cxt.pMsg);
1491
    } else {
1492
      pRequest->body.subplanNum = pDag->numOfSubplans;
726,775,109✔
1493
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
726,790,466✔
1494
    }
1495
  }
1496

1497
  pRequest->metric.execStart = taosGetTimestampUs();
727,011,224✔
1498
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
727,007,659✔
1499

1500
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
727,012,024✔
1501
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
726,558,963✔
1502
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
185,614,201✔
1503
    }
1504

1505
    if (code == TSDB_CODE_SUCCESS) {
726,592,344✔
1506
      code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
726,562,037✔
1507
                                  taosArrayGetSize(pNodeList));
726,596,007✔
1508
    }
1509

1510
    if (code == TSDB_CODE_SUCCESS) {
726,583,485✔
1511
      SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
726,580,651✔
1512
                               .requestId = pRequest->requestId,
726,586,357✔
1513
                               .requestObjRefId = pRequest->self};
726,595,598✔
1514
      SSchedulerReq    req = {
767,110,085✔
1515
             .syncReq       = false,
1516
             .localReq      = (tsQueryPolicy == QUERY_POLICY_CLIENT),
726,567,970✔
1517
             .pConn         = &conn,
1518
             .pNodeList     = pNodeList,
1519
             .pDag          = pDag,
1520
             .allocatorRefId = pRequest->allocatorRefId,
726,567,970✔
1521
             .sql           = pRequest->sqlstr,
726,546,822✔
1522
             .startTs       = pRequest->metric.start,
726,591,396✔
1523
             .execFp        = schedulerExecCb,
1524
             .cbParam       = pWrapper,
1525
             .chkKillFp     = chkRequestKilled,
1526
             .chkKillParam  = (void*)pRequest->self,
726,573,058✔
1527
             .pExecRes      = NULL,
1528
             .source        = pRequest->source,
726,567,692✔
1529
             .secureDelete  = pRequest->secureDelete,
726,574,381✔
1530
             .pWorkerCb     = getTaskPoolWorkerCb(),
726,583,830✔
1531
      };
1532

1533
      if (TSDB_CODE_SUCCESS == code) {
726,575,350✔
1534
        code = schedulerExecJob(&req, &pRequest->body.queryJob);
726,606,117✔
1535
      }
1536
      taosArrayDestroy(pNodeList);
726,584,402✔
1537
      taosArrayDestroy(pMnodeList);
726,614,421✔
1538
      return code;
726,607,821✔
1539
    }
1540
  }
1541

1542
  qDestroyQueryPlan(pDag);
471,880✔
1543
  tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
422,926✔
1544
           pRequest->requestId);
1545
  destorySqlCallbackWrapper(pWrapper);
422,926✔
1546
  pRequest->pWrapper = NULL;
422,926✔
1547
  if (TSDB_CODE_SUCCESS != code) {
422,926✔
1548
    pRequest->code = code;
240,850✔
1549
  }
1550

1551
  doRequestCallback(pRequest, code);
422,926✔
1552

1553
  // todo not to be released here
1554
  taosArrayDestroy(pMnodeList);
422,926✔
1555
  taosArrayDestroy(pNodeList);
422,926✔
1556

1557
  return code;
420,269✔
1558
}
1559

1560
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
829,419,158✔
1561
  int32_t code = 0;
829,419,158✔
1562

1563
  if (pRequest->parseOnly) {
829,419,158✔
1564
    doRequestCallback(pRequest, 0);
285,252✔
1565
    return;
285,252✔
1566
  }
1567

1568
  pRequest->body.execMode = pQuery->execMode;
829,150,971✔
1569
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
829,132,328✔
1570
    destorySqlCallbackWrapper(pWrapper);
102,146,701✔
1571
    pRequest->pWrapper = NULL;
102,149,980✔
1572
  }
1573

1574
  if (pQuery->pRoot && !pRequest->inRetry) {
829,120,432✔
1575
    STscObj*            pTscObj = pRequest->pTscObj;
829,158,202✔
1576
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
829,159,869✔
1577
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
829,169,537✔
1578
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
540,965,697✔
1579
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
484,432,036✔
1580
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
344,746,602✔
1581
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
138,292,059✔
1582
    }
1583
  }
1584

1585
  switch (pQuery->execMode) {
829,141,638✔
1586
    case QUERY_EXEC_MODE_LOCAL:
5,648,335✔
1587
      asyncExecLocalCmd(pRequest, pQuery);
5,648,335✔
1588
      break;
5,648,335✔
1589
    case QUERY_EXEC_MODE_RPC:
95,918,545✔
1590
      code = asyncExecDdlQuery(pRequest, pQuery);
95,918,545✔
1591
      break;
95,915,513✔
1592
    case QUERY_EXEC_MODE_SCHEDULE: {
727,005,186✔
1593
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
727,005,186✔
1594
      break;
727,029,141✔
1595
    }
1596
    case QUERY_EXEC_MODE_EMPTY_RESULT:
575,498✔
1597
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
575,498✔
1598
      doRequestCallback(pRequest, 0);
575,498✔
1599
      break;
575,498✔
1600
    default:
×
1601
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1602
      doRequestCallback(pRequest, -1);
×
1603
      break;
×
1604
  }
1605
}
1606

1607
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
12,426✔
1608
  SCatalog* pCatalog = NULL;
12,426✔
1609
  int32_t   code = 0;
12,426✔
1610
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
12,426✔
1611
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
12,426✔
1612

1613
  if (dbNum <= 0 && tblNum <= 0) {
12,426✔
1614
    return TSDB_CODE_APP_ERROR;
12,426✔
1615
  }
1616

UNCOV
1617
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
UNCOV
1618
  if (code != TSDB_CODE_SUCCESS) {
×
1619
    return code;
×
1620
  }
1621

UNCOV
1622
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
UNCOV
1623
                           .requestId = pRequest->requestId,
×
UNCOV
1624
                           .requestObjRefId = pRequest->self,
×
UNCOV
1625
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1626

UNCOV
1627
  for (int32_t i = 0; i < dbNum; ++i) {
×
UNCOV
1628
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1629

1630
    // catalogRefreshDBVgInfo will handle dbFName == null.
UNCOV
1631
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
UNCOV
1632
    if (code != TSDB_CODE_SUCCESS) {
×
1633
      return code;
×
1634
    }
1635
  }
1636

UNCOV
1637
  for (int32_t i = 0; i < tblNum; ++i) {
×
UNCOV
1638
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1639

1640
    // catalogRefreshTableMeta will handle tableName == null.
UNCOV
1641
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
UNCOV
1642
    if (code != TSDB_CODE_SUCCESS) {
×
1643
      return code;
×
1644
    }
1645
  }
1646

UNCOV
1647
  return code;
×
1648
}
1649

1650
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
4,599,229✔
1651
  SCatalog* pCatalog = NULL;
4,599,229✔
1652
  int32_t   tbNum = taosArrayGetSize(tbList);
4,599,229✔
1653
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,599,229✔
1654
  if (code != TSDB_CODE_SUCCESS) {
4,599,229✔
1655
    return code;
×
1656
  }
1657

1658
  if (isView) {
4,599,229✔
1659
    for (int32_t i = 0; i < tbNum; ++i) {
669,120✔
1660
      SName* pViewName = taosArrayGet(tbList, i);
334,560✔
1661
      char   dbFName[TSDB_DB_FNAME_LEN];
331,162✔
1662
      if (NULL == pViewName) {
334,560✔
1663
        continue;
×
1664
      }
1665
      (void)tNameGetFullDbName(pViewName, dbFName);
334,560✔
1666
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
334,560✔
1667
    }
1668
  } else {
1669
    for (int32_t i = 0; i < tbNum; ++i) {
6,635,315✔
1670
      SName* pTbName = taosArrayGet(tbList, i);
2,370,646✔
1671
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
2,370,646✔
1672
    }
1673
  }
1674

1675
  return TSDB_CODE_SUCCESS;
4,599,229✔
1676
}
1677

1678
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
82,902,659✔
1679
  pEpSet->version = 0;
82,902,659✔
1680

1681
  // init mnode ip set
1682
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
82,902,560✔
1683
  mgmtEpSet->numOfEps = 0;
82,902,762✔
1684
  mgmtEpSet->inUse = 0;
82,900,490✔
1685

1686
  if (firstEp && firstEp[0] != 0) {
82,902,762✔
1687
    if (strlen(firstEp) >= TSDB_EP_LEN) {
82,903,354✔
1688
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1689
      return -1;
×
1690
    }
1691

1692
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
82,903,354✔
1693
    if (code != TSDB_CODE_SUCCESS) {
82,902,863✔
1694
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1695
      return terrno;
×
1696
    }
1697
    // uint32_t addr = 0;
1698
    SIpAddr addr = {0};
82,902,863✔
1699
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
82,902,877✔
1700
    if (code) {
82,901,987✔
1701
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
112✔
1702
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1703
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
126✔
1704
    } else {
1705
      mgmtEpSet->numOfEps++;
82,904,733✔
1706
    }
1707
  }
1708

1709
  if (secondEp && secondEp[0] != 0) {
82,902,618✔
1710
    if (strlen(secondEp) >= TSDB_EP_LEN) {
2,190,722✔
1711
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1712
      return terrno;
×
1713
    }
1714

1715
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,190,722✔
1716
    if (code != TSDB_CODE_SUCCESS) {
2,190,750✔
1717
      return code;
×
1718
    }
1719
    SIpAddr addr = {0};
2,190,750✔
1720
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
2,190,722✔
1721
    if (code) {
2,190,764✔
1722
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1723
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1724
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1725
    } else {
1726
      mgmtEpSet->numOfEps++;
2,190,764✔
1727
    }
1728
  }
1729

1730
  if (mgmtEpSet->numOfEps == 0) {
82,902,660✔
1731
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
126✔
1732
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
126✔
1733
  }
1734

1735
  return 0;
82,901,835✔
1736
}
1737

1738
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
82,904,651✔
1739
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1740
  *pTscObj = NULL;
82,904,651✔
1741
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
82,904,651✔
1742
  if (TSDB_CODE_SUCCESS != code) {
82,904,600✔
1743
    return code;
×
1744
  }
1745

1746
  SRequestObj* pRequest = NULL;
82,904,600✔
1747
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
82,904,600✔
1748
  if (TSDB_CODE_SUCCESS != code) {
82,904,881✔
1749
    destroyTscObj(*pTscObj);
×
1750
    return code;
×
1751
  }
1752

1753
  pRequest->sqlstr = taosStrdup("taos_connect");
82,904,881✔
1754
  if (pRequest->sqlstr) {
82,902,236✔
1755
    pRequest->sqlLen = strlen(pRequest->sqlstr);
82,902,236✔
1756
  } else {
1757
    return terrno;
×
1758
  }
1759

1760
  SMsgSendInfo* body = NULL;
82,902,236✔
1761
  code = buildConnectMsg(pRequest, &body, totpCode);
82,902,234✔
1762
  if (TSDB_CODE_SUCCESS != code) {
82,900,932✔
1763
    destroyTscObj(*pTscObj);
×
1764
    return code;
×
1765
  }
1766

1767
  // int64_t transporterId = 0;
1768
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
82,900,932✔
1769
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
82,904,252✔
1770
  if (TSDB_CODE_SUCCESS != code) {
82,902,753✔
1771
    destroyTscObj(*pTscObj);
×
1772
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1773
    return code;
×
1774
  }
1775
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
82,902,753✔
1776
    destroyTscObj(*pTscObj);
597✔
1777
    tscError("failed to wait sem, code:%s", terrstr());
×
1778
    return terrno;
×
1779
  }
1780
  if (pRequest->code != TSDB_CODE_SUCCESS) {
82,902,584✔
1781
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
13,475✔
1782
    tscError("failed to connect to server, reason: %s", errorMsg);
13,475✔
1783

1784
    terrno = pRequest->code;
13,475✔
1785
    destroyRequest(pRequest);
13,475✔
1786
    taos_close_internal(*pTscObj);
13,475✔
1787
    *pTscObj = NULL;
13,475✔
1788
    return terrno;
13,475✔
1789
  }
1790
  if (connType == CONN_TYPE__AUTH_TEST) {
82,889,109✔
1791
    terrno = TSDB_CODE_SUCCESS;
94✔
1792
    destroyRequest(pRequest);
94✔
1793
    taos_close_internal(*pTscObj);
94✔
1794
    *pTscObj = NULL;
1,087✔
1795
    return TSDB_CODE_SUCCESS;
1,087✔
1796
  }
1797

1798
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
82,889,015✔
1799
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1800
  destroyRequest(pRequest);
82,890,752✔
1801
  return code;
82,888,075✔
1802
}
1803

1804
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
82,902,327✔
1805
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
82,902,327✔
1806
  if (*pMsgSendInfo == NULL) {
82,904,264✔
1807
    return terrno;
×
1808
  }
1809

1810
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
82,904,262✔
1811

1812
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
82,904,262✔
1813
  (*pMsgSendInfo)->requestId = pRequest->requestId;
82,904,262✔
1814
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
82,904,262✔
1815
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
82,904,604✔
1816
  if (NULL == (*pMsgSendInfo)->param) {
82,904,649✔
1817
    taosMemoryFree(*pMsgSendInfo);
×
1818
    return terrno;
×
1819
  }
1820

1821
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
82,904,621✔
1822

1823
  SConnectReq connectReq = {0};
82,904,623✔
1824
  STscObj*    pObj = pRequest->pTscObj;
82,904,623✔
1825

1826
  char* db = getDbOfConnection(pObj);
82,904,059✔
1827
  if (db != NULL) {
82,903,441✔
1828
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
501,027✔
1829
  } else if (terrno) {
82,402,414✔
1830
    taosMemoryFree(*pMsgSendInfo);
×
1831
    return terrno;
×
1832
  }
1833
  taosMemoryFreeClear(db);
82,904,315✔
1834

1835
  connectReq.connType = pObj->connType;
82,904,350✔
1836
  connectReq.pid = appInfo.pid;
82,904,376✔
1837
  connectReq.startTime = appInfo.startTime;
82,904,910✔
1838
  connectReq.totpCode = totpCode;
82,904,910✔
1839
  connectReq.connectTime = taosGetTimestampMs();
82,901,436✔
1840

1841
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
82,901,436✔
1842
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
82,901,998✔
1843
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
82,901,970✔
1844
  tstrncpy(connectReq.token, pObj->token, sizeof(connectReq.token));
82,901,395✔
1845
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
82,901,397✔
1846
  tSignConnectReq(&connectReq);
82,901,395✔
1847

1848
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
82,903,644✔
1849
  void*   pReq = taosMemoryMalloc(contLen);
82,900,581✔
1850
  if (NULL == pReq) {
82,902,316✔
1851
    taosMemoryFree(*pMsgSendInfo);
×
1852
    return terrno;
×
1853
  }
1854

1855
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
82,902,316✔
1856
    taosMemoryFree(*pMsgSendInfo);
1,646✔
1857
    taosMemoryFree(pReq);
×
1858
    return terrno;
×
1859
  }
1860

1861
  (*pMsgSendInfo)->msgInfo.len = contLen;
82,900,662✔
1862
  (*pMsgSendInfo)->msgInfo.pData = pReq;
82,900,662✔
1863
  return TSDB_CODE_SUCCESS;
82,899,043✔
1864
}
1865

1866
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,893,000,102✔
1867
  if (NULL == pEpSet) {
1,893,000,102✔
1868
    return;
1,888,724,949✔
1869
  }
1870

1871
  switch (pSendInfo->target.type) {
4,275,153✔
1872
    case TARGET_TYPE_MNODE:
391✔
1873
      if (NULL == pTscObj) {
391✔
1874
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1875
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1876
        return;
480✔
1877
      }
1878

1879
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
391✔
1880
      SEpSet* pOrig = &originEpset;
391✔
1881
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
391✔
1882
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
391✔
1883
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
391✔
1884
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1885
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
391✔
1886
      break;
476,839✔
1887
    case TARGET_TYPE_VNODE: {
4,084,334✔
1888
      if (NULL == pTscObj) {
4,084,334✔
1889
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
480✔
1890
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1891
        return;
480✔
1892
      }
1893

1894
      SCatalog* pCatalog = NULL;
4,083,854✔
1895
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,083,854✔
1896
      if (code != TSDB_CODE_SUCCESS) {
4,083,832✔
1897
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1898
                 tstrerror(code));
1899
        return;
×
1900
      }
1901

1902
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
4,083,832✔
1903
      if (code != TSDB_CODE_SUCCESS) {
4,083,890✔
1904
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1905
                 tstrerror(code));
1906
        return;
×
1907
      }
1908
      taosMemoryFreeClear(pSendInfo->target.dbFName);
4,083,890✔
1909
      break;
4,083,901✔
1910
    }
1911
    default:
196,275✔
1912
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
196,275✔
1913
      break;
196,877✔
1914
  }
1915
}
1916

1917
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
1,893,711,573✔
1918
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
1,893,711,573✔
1919
  if (pMsg->info.ahandle == NULL) {
1,893,711,888✔
1920
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
537,022✔
1921
    rpcFreeCont(pMsg->pCont);
537,022✔
1922
    taosMemoryFree(pEpSet);
537,022✔
1923
    return TSDB_CODE_TSC_INTERNAL_ERROR;
537,022✔
1924
  }
1925

1926
  STscObj* pTscObj = NULL;
1,893,175,404✔
1927

1928
  STraceId* trace = &pMsg->info.traceId;
1,893,175,404✔
1929
  char      tbuf[40] = {0};
1,893,177,572✔
1930
  TRACE_TO_STR(trace, tbuf);
1,893,178,353✔
1931

1932
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
1,893,201,467✔
1933
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1934

1935
  if (pSendInfo->requestObjRefId != 0) {
1,893,202,060✔
1936
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,605,949,183✔
1937
    if (pRequest) {
1,605,954,124✔
1938
      if (pRequest->self != pSendInfo->requestObjRefId) {
1,592,776,841✔
1939
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1940
                 pSendInfo->requestObjRefId);
1941

1942
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1943
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1944
        }
1945
        rpcFreeCont(pMsg->pCont);
×
1946
        taosMemoryFree(pEpSet);
×
1947
        destroySendMsgInfo(pSendInfo);
×
1948
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1949
      }
1950
      pTscObj = pRequest->pTscObj;
1,592,776,313✔
1951
    }
1952
  }
1953

1954
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
1,893,203,882✔
1955

1956
  SDataBuf buf = {.msgType = pMsg->msgType,
1,893,007,347✔
1957
                  .len = pMsg->contLen,
1,893,008,677✔
1958
                  .pData = NULL,
1959
                  .handle = pMsg->info.handle,
1,893,012,265✔
1960
                  .handleRefId = pMsg->info.refId,
1,893,012,553✔
1961
                  .pEpSet = pEpSet};
1962

1963
  if (pMsg->contLen > 0) {
1,893,015,948✔
1964
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
1,852,238,962✔
1965
    if (buf.pData == NULL) {
1,852,296,315✔
1966
      pMsg->code = terrno;
×
1967
    } else {
1968
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
1,852,296,315✔
1969
    }
1970
  }
1971

1972
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
1,893,086,038✔
1973

1974
  if (pTscObj) {
1,893,149,914✔
1975
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,592,749,394✔
1976
    if (TSDB_CODE_SUCCESS != code) {
1,592,785,022✔
1977
      tscError("doProcessMsgFromServer taosReleaseRef failed");
1,106✔
1978
      terrno = code;
1,106✔
1979
      pMsg->code = code;
1,106✔
1980
    }
1981
  }
1982

1983
  rpcFreeCont(pMsg->pCont);
1,893,185,542✔
1984
  destroySendMsgInfo(pSendInfo);
1,893,198,747✔
1985
  return TSDB_CODE_SUCCESS;
1,893,168,386✔
1986
}
1987

1988
int32_t doProcessMsgFromServer(void* param) {
1,893,742,073✔
1989
  AsyncArg* arg = (AsyncArg*)param;
1,893,742,073✔
1990
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
1,893,742,073✔
1991
  taosMemoryFree(arg);
1,893,696,123✔
1992
  return code;
1,893,728,303✔
1993
}
1994

1995
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,893,452,052✔
1996
  int32_t code = 0;
1,893,452,052✔
1997
  SEpSet* tEpSet = NULL;
1,893,452,052✔
1998

1999
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
1,893,452,052✔
2000

2001
  if (pEpSet != NULL) {
1,893,474,542✔
2002
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
4,281,679✔
2003
    if (NULL == tEpSet) {
4,281,589✔
2004
      code = terrno;
×
2005
      pMsg->code = terrno;
×
2006
      goto _exit;
×
2007
    }
2008
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
4,281,589✔
2009
  }
2010

2011
  // pMsg is response msg
2012
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
1,893,474,452✔
2013
    // restore origin code
2014
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
82,844,095✔
2015
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
2016
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
82,844,095✔
2017
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
2018
    }
2019
  } else {
2020
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
2021
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
1,810,624,474✔
2022
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
2023
    }
2024
  }
2025

2026
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
1,893,481,816✔
2027
  if (NULL == arg) {
1,893,448,287✔
2028
    code = terrno;
×
2029
    pMsg->code = code;
×
2030
    goto _exit;
×
2031
  }
2032

2033
  arg->msg = *pMsg;
1,893,448,287✔
2034
  arg->pEpset = tEpSet;
1,893,464,142✔
2035

2036
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
1,893,483,107✔
2037
    pMsg->code = code;
79,143✔
2038
    taosMemoryFree(arg);
79,143✔
2039
    goto _exit;
×
2040
  }
2041
  return;
1,893,541,923✔
2042

2043
_exit:
×
2044
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
2045
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
2046
  if (code != 0) {
×
2047
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2048
  }
2049
}
2050

2051
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
2,148✔
2052
                        uint16_t port) {
2053
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
2,148✔
2054
  if (user == NULL) {
2,148✔
2055
    user = TSDB_DEFAULT_USER;
×
2056
  }
2057

2058
  if (pass == NULL) {
2,148✔
2059
    pass = TSDB_DEFAULT_PASS;
×
2060
  }
2061

2062
  STscObj* pObj = NULL;
2,148✔
2063
  int32_t  code = taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__QUERY, &pObj);
2,148✔
2064
  if (TSDB_CODE_SUCCESS == code) {
2,148✔
2065
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,418✔
2066
    if (NULL == rid) {
1,418✔
2067
      tscError("out of memory when taos_connect_totp to %s:%u, user:%s db:%s", ip, port, user, db);
×
2068
      return NULL;
×
2069
    }
2070
    *rid = pObj->id;
1,418✔
2071
    return (TAOS*)rid;
1,418✔
2072
  } else {
2073
    terrno = code;
730✔
2074
  }
2075

2076
  return NULL;
730✔
2077
}
2078

2079
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
188✔
2080
                      uint16_t port) {
2081
  tscInfo("try to test connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
188✔
2082
  if (user == NULL) {
188✔
2083
    user = TSDB_DEFAULT_USER;
×
2084
  }
2085

2086
  if (pass == NULL) {
188✔
2087
    pass = TSDB_DEFAULT_PASS;
×
2088
  }
2089

2090
  STscObj* pObj = NULL;
188✔
2091
  return taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
188✔
2092
}
2093

2094
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) {
2,364✔
2095
  tscInfo("try to connect to %s:%u by token, db:%s", ip, port, db);
2,364✔
2096

2097
  STscObj* pObj = NULL;
2,364✔
2098
  int32_t  code = taos_connect_by_auth(ip, NULL, token, NULL, db, port, CONN_TYPE__QUERY, &pObj);
2,364✔
2099
  if (TSDB_CODE_SUCCESS == code) {
2,364✔
2100
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,148✔
2101
    if (NULL == rid) {
1,148✔
2102
      tscError("out of memory when taos_connect_token to %s:%u db:%s", ip, port, db);
×
2103
      return NULL;
×
2104
    }
2105
    *rid = pObj->id;
1,148✔
2106
    return (TAOS*)rid;
1,148✔
2107
  } else {
2108
    terrno = code;
1,216✔
2109
  }
2110

2111
  return NULL;
1,216✔
2112
}
2113

2114
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
138✔
2115
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
138✔
2116
  if (user == NULL) {
138✔
2117
    user = TSDB_DEFAULT_USER;
×
2118
  }
2119

2120
  if (auth == NULL) {
138✔
2121
    tscError("No auth info is given, failed to connect to server");
×
2122
    return NULL;
×
2123
  }
2124

2125
  STscObj* pObj = NULL;
138✔
2126
  int32_t  code = taos_connect_by_auth(ip, user, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
138✔
2127
  if (TSDB_CODE_SUCCESS == code) {
138✔
2128
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
45✔
2129
    if (NULL == rid) {
45✔
2130
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2131
    }
2132
    *rid = pObj->id;
45✔
2133
    return (TAOS*)rid;
45✔
2134
  }
2135

2136
  return NULL;
93✔
2137
}
2138

2139
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2140
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2141
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2142

2143
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2144
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2145

2146
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2147
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2148
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2149

2150
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
2151
          pResultInfo->length[i] = blobDataLen(pStart);
7✔
2152
          pResultInfo->row[i] = blobDataVal(pStart);
71✔
2153
        } else {
2154
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2155
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2156
        }
2157
      } else {
2158
        pResultInfo->row[i] = NULL;
265,336,687✔
2159
        pResultInfo->length[i] = 0;
265,357,823✔
2160
      }
2161
    } else {
2162
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2163
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2164
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2165
      } else {
2166
        pResultInfo->row[i] = NULL;
1,142,526,370✔
2167
        pResultInfo->length[i] = 0;
1,142,616,511✔
2168
      }
2169
    }
2170
  }
2171
}
2,147,483,647✔
2172

2173
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2174
  if (pRequest == NULL) {
×
2175
    return NULL;
×
2176
  }
2177

2178
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2179
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2180
    // All data has returned to App already, no need to try again
2181
    if (pResultInfo->completed) {
×
2182
      pResultInfo->numOfRows = 0;
×
2183
      return NULL;
×
2184
    }
2185

2186
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2187
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2188

2189
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2190
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2191
      pResultInfo->numOfRows = 0;
×
2192
      return NULL;
×
2193
    }
2194

2195
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2196
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2197
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2198
      pResultInfo->numOfRows = 0;
×
2199
      return NULL;
×
2200
    }
2201

2202
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2203
             ", complete:%d, QID:0x%" PRIx64,
2204
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2205

2206
    STscObj*            pTscObj = pRequest->pTscObj;
×
2207
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2208
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2209

2210
    if (pResultInfo->numOfRows == 0) {
×
2211
      return NULL;
×
2212
    }
2213
  }
2214

2215
  if (setupOneRowPtr) {
×
2216
    doSetOneRowPtr(pResultInfo);
×
2217
    pResultInfo->current += 1;
×
2218
  }
2219

2220
  return pResultInfo->row;
×
2221
}
2222

2223
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
209,239,722✔
2224
  tsem_t* sem = param;
209,239,722✔
2225
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
209,239,722✔
2226
    tscError("failed to post sem, code:%s", terrstr());
×
2227
  }
2228
}
209,239,735✔
2229

2230
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,969,966,532✔
2231
  if (pRequest == NULL) {
1,969,966,532✔
2232
    return NULL;
×
2233
  }
2234

2235
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,969,966,532✔
2236
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,969,968,670✔
2237
    // All data has returned to App already, no need to try again
2238
    if (pResultInfo->completed) {
300,144,981✔
2239
      pResultInfo->numOfRows = 0;
90,906,212✔
2240
      return NULL;
90,906,220✔
2241
    }
2242

2243
    // convert ucs4 to native multi-bytes string
2244
    pResultInfo->convertUcs4 = convertUcs4;
209,239,703✔
2245
    tsem_t sem;
208,539,316✔
2246
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
209,239,731✔
2247
      tscError("failed to init sem, code:%s", terrstr());
×
2248
    }
2249
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
209,239,673✔
2250
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
209,239,735✔
2251
      tscError("failed to wait sem, code:%s", terrstr());
×
2252
    }
2253
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
209,239,707✔
2254
      tscError("failed to destroy sem, code:%s", terrstr());
×
2255
    }
2256
    pRequest->inCallback = false;
209,239,735✔
2257
  }
2258

2259
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,879,063,934✔
2260
    return NULL;
10,471,192✔
2261
  } else {
2262
    if (setupOneRowPtr) {
1,868,593,008✔
2263
      doSetOneRowPtr(pResultInfo);
1,668,867,056✔
2264
      pResultInfo->current += 1;
1,668,865,480✔
2265
    }
2266

2267
    return pResultInfo->row;
1,868,591,325✔
2268
  }
2269
}
2270

2271
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
285,412,565✔
2272
  if (pResInfo->row == NULL) {
285,412,565✔
2273
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
173,880,021✔
2274
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
173,878,618✔
2275
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
173,875,794✔
2276
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
173,877,197✔
2277

2278
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
173,880,383✔
2279
      taosMemoryFree(pResInfo->row);
×
2280
      taosMemoryFree(pResInfo->pCol);
×
2281
      taosMemoryFree(pResInfo->length);
×
2282
      taosMemoryFree(pResInfo->convertBuf);
×
2283
      return terrno;
×
2284
    }
2285
  }
2286

2287
  return TSDB_CODE_SUCCESS;
285,415,402✔
2288
}
2289

2290
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
285,325,236✔
2291
  int32_t idx = -1;
285,325,236✔
2292
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
285,325,589✔
2293
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
285,324,738✔
2294

2295
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,630,483,369✔
2296
    int32_t type = pResultInfo->fields[i].type;
1,345,164,969✔
2297
    int32_t schemaBytes =
2298
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,345,168,140✔
2299

2300
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1,345,165,328✔
2301
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
87,511,086✔
2302
      if (p == NULL) {
87,511,086✔
2303
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2304
        return terrno;
×
2305
      }
2306

2307
      pResultInfo->convertBuf[i] = p;
87,511,086✔
2308

2309
      SResultColumn* pCol = &pResultInfo->pCol[i];
87,511,086✔
2310
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2311
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2312
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2313

2314
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2315
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2316
            tscError(
7,242✔
2317
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2318
                "colLength[i]):%p",
2319
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2320
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
7,242✔
2321
            return TSDB_CODE_TSC_INTERNAL_ERROR;
63✔
2322
          }
2323

2324
          varDataSetLen(p, len);
2,147,483,647✔
2325
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2326
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2327
        }
2328
      }
2329

2330
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
87,511,023✔
2331
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
87,511,023✔
2332
    }
2333
  }
2334
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
285,325,387✔
2335
  return TSDB_CODE_SUCCESS;
285,327,295✔
2336
}
2337

2338
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
285,323,063✔
2339
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,630,475,228✔
2340
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
1,345,157,551✔
2341
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
1,345,159,130✔
2342
    int32_t       type = pFieldE->type;
1,345,155,190✔
2343
    int32_t       bufLen = 0;
1,345,160,582✔
2344
    char*         p = NULL;
1,345,160,582✔
2345
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
1,345,160,582✔
2346
      continue;
1,342,948,496✔
2347
    } else {
2348
      bufLen = 64;
2,205,298✔
2349
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
2,205,298✔
2350
      pFieldE->bytes = bufLen;
2,205,298✔
2351
      pField->bytes = bufLen;
2,205,298✔
2352
    }
2353
    if (!p) return terrno;
2,205,298✔
2354
    pResultInfo->convertBuf[i] = p;
2,205,298✔
2355

2356
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,458,069,192✔
2357
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
1,455,863,894✔
2358
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
1,455,863,894✔
2359
      p += bufLen;
1,455,863,894✔
2360
      if (TSDB_CODE_SUCCESS != code) {
1,455,863,894✔
2361
        return code;
×
2362
      }
2363
    }
2364
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
2,205,298✔
2365
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
2,205,298✔
2366
  }
2367
  return 0;
285,321,913✔
2368
}
2369

2370
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
379,098✔
2371
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
757,816✔
2372
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
378,718✔
2373
}
2374

2375
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
189,549✔
2376
  char*   p = (char*)pResultInfo->pData;
189,549✔
2377
  int32_t blockVersion = *(int32_t*)p;
189,549✔
2378

2379
  int32_t numOfRows = pResultInfo->numOfRows;
189,549✔
2380
  int32_t numOfCols = pResultInfo->numOfCols;
189,549✔
2381

2382
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2383
  // length |
2384
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
189,549✔
2385
  if (numOfCols != cols) {
189,549✔
2386
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2387
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2388
  }
2389

2390
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
189,549✔
2391
  int32_t* colLength = (int32_t*)(p + len);
189,549✔
2392
  len += sizeof(int32_t) * numOfCols;
189,549✔
2393

2394
  char* pStart = p + len;
189,549✔
2395
  for (int32_t i = 0; i < numOfCols; ++i) {
809,410✔
2396
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
619,861✔
2397

2398
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
619,861✔
2399
      int32_t* offset = (int32_t*)pStart;
220,314✔
2400
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
220,314✔
2401
      len += lenTmp;
220,314✔
2402
      pStart += lenTmp;
220,314✔
2403

2404
      int32_t estimateColLen = 0;
220,314✔
2405
      for (int32_t j = 0; j < numOfRows; ++j) {
1,063,353✔
2406
        if (offset[j] == -1) {
843,039✔
2407
          continue;
40,694✔
2408
        }
2409
        char* data = offset[j] + pStart;
802,345✔
2410

2411
        int32_t jsonInnerType = *data;
802,345✔
2412
        char*   jsonInnerData = data + CHAR_BYTES;
802,345✔
2413
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
802,345✔
2414
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
11,160✔
2415
        } else if (tTagIsJson(data)) {
791,185✔
2416
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
193,352✔
2417
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
597,833✔
2418
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
555,983✔
2419
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
41,850✔
2420
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
30,690✔
2421
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
11,160✔
2422
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
11,160✔
2423
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
2424
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2425
        } else {
2426
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2427
          return -1;
×
2428
        }
2429
      }
2430
      len += TMAX(colLen, estimateColLen);
220,314✔
2431
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
399,547✔
2432
      int32_t lenTmp = numOfRows * sizeof(int32_t);
55,800✔
2433
      len += (lenTmp + colLen);
55,800✔
2434
      pStart += lenTmp;
55,800✔
2435
    } else {
2436
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
343,747✔
2437
      len += (lenTmp + colLen);
343,747✔
2438
      pStart += lenTmp;
343,747✔
2439
    }
2440
    pStart += colLen;
619,861✔
2441
  }
2442

2443
  // Ensure the complete structure of the block, including the blankfill field,
2444
  // even though it is not used on the client side.
2445
  len += sizeof(bool);
189,549✔
2446
  return len;
189,549✔
2447
}
2448

2449
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
285,415,954✔
2450
  int32_t numOfRows = pResultInfo->numOfRows;
285,415,954✔
2451
  int32_t numOfCols = pResultInfo->numOfCols;
285,415,597✔
2452
  bool    needConvert = false;
285,415,428✔
2453
  for (int32_t i = 0; i < numOfCols; ++i) {
1,630,574,283✔
2454
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
1,345,346,661✔
2455
      needConvert = true;
189,549✔
2456
      break;
189,549✔
2457
    }
2458
  }
2459

2460
  if (!needConvert) {
285,417,171✔
2461
    return TSDB_CODE_SUCCESS;
285,227,622✔
2462
  }
2463

2464
  tscDebug("start to convert form json format string");
189,549✔
2465

2466
  char*   p = (char*)pResultInfo->pData;
189,549✔
2467
  int32_t blockVersion = *(int32_t*)p;
189,549✔
2468
  int32_t dataLen = estimateJsonLen(pResultInfo);
189,549✔
2469
  if (dataLen <= 0) {
189,549✔
2470
    tscError("doConvertJson error: estimateJsonLen failed");
×
2471
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2472
  }
2473

2474
  taosMemoryFreeClear(pResultInfo->convertJson);
189,549✔
2475
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
189,549✔
2476
  if (pResultInfo->convertJson == NULL) return terrno;
189,549✔
2477
  char* p1 = pResultInfo->convertJson;
189,549✔
2478

2479
  int32_t totalLen = 0;
189,549✔
2480
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
189,549✔
2481
  if (numOfCols != cols) {
189,549✔
2482
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2483
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2484
  }
2485

2486
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
189,549✔
2487
  (void)memcpy(p1, p, len);
189,549✔
2488

2489
  p += len;
189,549✔
2490
  p1 += len;
189,549✔
2491
  totalLen += len;
189,549✔
2492

2493
  len = sizeof(int32_t) * numOfCols;
189,549✔
2494
  int32_t* colLength = (int32_t*)p;
189,549✔
2495
  int32_t* colLength1 = (int32_t*)p1;
189,549✔
2496
  (void)memcpy(p1, p, len);
189,549✔
2497
  p += len;
189,549✔
2498
  p1 += len;
189,549✔
2499
  totalLen += len;
189,549✔
2500

2501
  char* pStart = p;
189,549✔
2502
  char* pStart1 = p1;
189,549✔
2503
  for (int32_t i = 0; i < numOfCols; ++i) {
809,410✔
2504
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
619,861✔
2505
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
619,861✔
2506
    if (colLen >= dataLen) {
619,861✔
2507
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2508
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2509
    }
2510
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
619,861✔
2511
      int32_t* offset = (int32_t*)pStart;
220,314✔
2512
      int32_t* offset1 = (int32_t*)pStart1;
220,314✔
2513
      len = numOfRows * sizeof(int32_t);
220,314✔
2514
      (void)memcpy(pStart1, pStart, len);
220,314✔
2515
      pStart += len;
220,314✔
2516
      pStart1 += len;
220,314✔
2517
      totalLen += len;
220,314✔
2518

2519
      len = 0;
220,314✔
2520
      for (int32_t j = 0; j < numOfRows; ++j) {
1,063,353✔
2521
        if (offset[j] == -1) {
843,039✔
2522
          continue;
40,694✔
2523
        }
2524
        char* data = offset[j] + pStart;
802,345✔
2525

2526
        int32_t jsonInnerType = *data;
802,345✔
2527
        char*   jsonInnerData = data + CHAR_BYTES;
802,345✔
2528
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
802,345✔
2529
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
802,345✔
2530
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
11,160✔
2531
          varDataSetLen(dst, strlen(varDataVal(dst)));
11,160✔
2532
        } else if (tTagIsJson(data)) {
791,185✔
2533
          char* jsonString = NULL;
193,352✔
2534
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
193,352✔
2535
          if (jsonString == NULL) {
193,352✔
2536
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2537
            return terrno;
×
2538
          }
2539
          STR_TO_VARSTR(dst, jsonString);
193,352✔
2540
          taosMemoryFree(jsonString);
193,352✔
2541
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
597,833✔
2542
          *(char*)varDataVal(dst) = '\"';
555,983✔
2543
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
555,983✔
2544
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
555,983✔
2545
                                         pResultInfo->charsetCxt);
2546
          if (length <= 0) {
555,983✔
2547
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
465✔
2548
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2549
            length = 0;
465✔
2550
          }
2551
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
555,983✔
2552
                                                 varDataVal(tmp), length);
2553
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
555,983✔
2554
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
555,983✔
2555
          tscError("value:%s.", varDataVal(dst));
555,983✔
2556
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
41,850✔
2557
          double jsonVd = *(double*)(jsonInnerData);
30,690✔
2558
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
30,690✔
2559
          varDataSetLen(dst, strlen(varDataVal(dst)));
30,690✔
2560
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
11,160✔
2561
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
11,160✔
2562
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
11,160✔
2563
          varDataSetLen(dst, strlen(varDataVal(dst)));
11,160✔
2564
        } else {
2565
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2566
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2567
        }
2568

2569
        offset1[j] = len;
802,345✔
2570
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
802,345✔
2571
        len += varDataTLen(dst);
802,345✔
2572
      }
2573
      colLen1 = len;
220,314✔
2574
      totalLen += colLen1;
220,314✔
2575
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
220,314✔
2576
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
399,547✔
2577
      len = numOfRows * sizeof(int32_t);
55,800✔
2578
      (void)memcpy(pStart1, pStart, len);
55,800✔
2579
      pStart += len;
55,800✔
2580
      pStart1 += len;
55,800✔
2581
      totalLen += len;
55,800✔
2582
      totalLen += colLen;
55,800✔
2583
      (void)memcpy(pStart1, pStart, colLen);
55,800✔
2584
    } else {
2585
      len = BitmapLen(pResultInfo->numOfRows);
343,747✔
2586
      (void)memcpy(pStart1, pStart, len);
343,747✔
2587
      pStart += len;
343,747✔
2588
      pStart1 += len;
343,747✔
2589
      totalLen += len;
343,747✔
2590
      totalLen += colLen;
343,747✔
2591
      (void)memcpy(pStart1, pStart, colLen);
343,747✔
2592
    }
2593
    pStart += colLen;
619,861✔
2594
    pStart1 += colLen1;
619,861✔
2595
  }
2596

2597
  // Ensure the complete structure of the block, including the blankfill field,
2598
  // even though it is not used on the client side.
2599
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2600
  totalLen += sizeof(bool);
189,549✔
2601

2602
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
189,549✔
2603
  pResultInfo->pData = pResultInfo->convertJson;
189,549✔
2604
  return TSDB_CODE_SUCCESS;
189,549✔
2605
}
2606

2607
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
307,993,306✔
2608
  bool convertForDecimal = convertUcs4;
307,993,306✔
2609
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
307,993,306✔
2610
    tscError("setResultDataPtr paras error");
13✔
2611
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2612
  }
2613

2614
  if (pResultInfo->numOfRows == 0) {
307,995,060✔
2615
    return TSDB_CODE_SUCCESS;
22,580,026✔
2616
  }
2617

2618
  if (pResultInfo->pData == NULL) {
285,415,225✔
2619
    tscError("setResultDataPtr error: pData is NULL");
×
2620
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2621
  }
2622

2623
  int32_t code = doPrepareResPtr(pResultInfo);
285,413,431✔
2624
  if (code != TSDB_CODE_SUCCESS) {
285,415,045✔
2625
    return code;
×
2626
  }
2627
  code = doConvertJson(pResultInfo);
285,415,045✔
2628
  if (code != TSDB_CODE_SUCCESS) {
285,412,063✔
2629
    return code;
×
2630
  }
2631

2632
  char* p = (char*)pResultInfo->pData;
285,412,063✔
2633

2634
  // version:
2635
  int32_t blockVersion = *(int32_t*)p;
285,412,777✔
2636
  p += sizeof(int32_t);
285,414,193✔
2637

2638
  int32_t dataLen = *(int32_t*)p;
285,415,256✔
2639
  p += sizeof(int32_t);
285,415,248✔
2640

2641
  int32_t rows = *(int32_t*)p;
285,416,818✔
2642
  p += sizeof(int32_t);
285,417,175✔
2643

2644
  int32_t cols = *(int32_t*)p;
285,416,818✔
2645
  p += sizeof(int32_t);
285,416,822✔
2646

2647
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
285,416,814✔
2648
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
8✔
2649
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2650
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2651
  }
2652

2653
  int32_t hasColumnSeg = *(int32_t*)p;
285,416,660✔
2654
  p += sizeof(int32_t);
285,415,763✔
2655

2656
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
285,417,163✔
2657
  p += sizeof(uint64_t);
285,417,163✔
2658

2659
  // check fields
2660
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,630,804,256✔
2661
    int8_t type = *(int8_t*)p;
1,345,393,754✔
2662
    p += sizeof(int8_t);
1,345,389,518✔
2663

2664
    int32_t bytes = *(int32_t*)p;
1,345,393,754✔
2665
    p += sizeof(int32_t);
1,345,394,803✔
2666

2667
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
1,345,393,965✔
2668
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
35,138✔
2669
    }
2670
  }
2671

2672
  int32_t* colLength = (int32_t*)p;
285,416,160✔
2673
  p += sizeof(int32_t) * pResultInfo->numOfCols;
285,416,160✔
2674

2675
  char* pStart = p;
285,415,807✔
2676
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,630,817,524✔
2677
    if ((pStart - pResultInfo->pData) >= dataLen) {
1,345,403,525✔
2678
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2679
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2680
    }
2681
    if (blockVersion == BLOCK_VERSION_1) {
1,345,386,244✔
2682
      colLength[i] = htonl(colLength[i]);
1,060,282,903✔
2683
    }
2684
    if (colLength[i] >= dataLen) {
1,345,387,202✔
2685
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2686
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2687
    }
2688
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
1,345,396,018✔
2689
      tscError("invalid type %d", pResultInfo->fields[i].type);
1,873✔
2690
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2691
    }
2692
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,345,398,383✔
2693
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
291,812,669✔
2694
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
291,804,897✔
2695
    } else {
2696
      pResultInfo->pCol[i].nullbitmap = pStart;
1,053,600,198✔
2697
      pStart += BitmapLen(pResultInfo->numOfRows);
1,053,600,756✔
2698
    }
2699

2700
    pResultInfo->pCol[i].pData = pStart;
1,345,407,875✔
2701
    pResultInfo->length[i] =
2,147,483,647✔
2702
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2703
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,345,400,305✔
2704

2705
    pStart += colLength[i];
1,345,399,471✔
2706
  }
2707

2708
  p = pStart;
285,417,868✔
2709
  // bool blankFill = *(bool*)p;
2710
  p += sizeof(bool);
285,417,868✔
2711
  int32_t offset = p - pResultInfo->pData;
285,417,515✔
2712
  if (offset > dataLen) {
285,416,801✔
2713
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2714
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2715
  }
2716

2717
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2718
  if (convertUcs4) {
285,416,801✔
2719
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
285,326,494✔
2720
  }
2721
#endif
2722
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
285,416,959✔
2723
    code = convertDecimalType(pResultInfo);
285,326,236✔
2724
  }
2725
  return code;
285,410,730✔
2726
}
2727

2728
char* getDbOfConnection(STscObj* pObj) {
1,087,418,416✔
2729
  terrno = TSDB_CODE_SUCCESS;
1,087,418,416✔
2730
  char* p = NULL;
1,087,440,818✔
2731
  (void)taosThreadMutexLock(&pObj->mutex);
1,087,440,818✔
2732
  size_t len = strlen(pObj->db);
1,087,455,108✔
2733
  if (len > 0) {
1,087,455,108✔
2734
    p = taosStrndup(pObj->db, tListLen(pObj->db));
703,898,310✔
2735
    if (p == NULL) {
703,896,124✔
2736
      tscError("failed to taosStrndup db name");
×
2737
    }
2738
  }
2739

2740
  (void)taosThreadMutexUnlock(&pObj->mutex);
1,087,452,922✔
2741
  return p;
1,087,434,074✔
2742
}
2743

2744
void setConnectionDB(STscObj* pTscObj, const char* db) {
82,058,556✔
2745
  if (db == NULL || pTscObj == NULL) {
82,058,556✔
2746
    tscError("setConnectionDB para is NULL");
×
2747
    return;
×
2748
  }
2749

2750
  (void)taosThreadMutexLock(&pTscObj->mutex);
82,095,825✔
2751
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
82,118,065✔
2752
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
82,117,986✔
2753
}
2754

2755
void resetConnectDB(STscObj* pTscObj) {
×
2756
  if (pTscObj == NULL) {
×
2757
    return;
×
2758
  }
2759

2760
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2761
  pTscObj->db[0] = 0;
×
2762
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2763
}
2764

2765
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
255,602,858✔
2766
                              bool isStmt) {
2767
  if (pResultInfo == NULL || pRsp == NULL) {
255,602,858✔
2768
    tscError("setQueryResultFromRsp paras is null");
23✔
2769
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2770
  }
2771

2772
  taosMemoryFreeClear(pResultInfo->pRspMsg);
255,602,835✔
2773
  pResultInfo->pRspMsg = (const char*)pRsp;
255,602,858✔
2774
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
255,602,858✔
2775
  pResultInfo->current = 0;
255,602,858✔
2776
  pResultInfo->completed = (pRsp->completed == 1);
255,602,858✔
2777
  pResultInfo->precision = pRsp->precision;
255,602,831✔
2778

2779
  // decompress data if needed
2780
  int32_t payloadLen = htonl(pRsp->payloadLen);
255,602,854✔
2781

2782
  if (pRsp->compressed) {
255,602,854✔
2783
    if (pResultInfo->decompBuf == NULL) {
×
2784
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
2785
      if (pResultInfo->decompBuf == NULL) {
×
2786
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2787
        return terrno;
×
2788
      }
2789
      pResultInfo->decompBufSize = payloadLen;
×
2790
    } else {
2791
      if (pResultInfo->decompBufSize < payloadLen) {
×
2792
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2793
        if (p == NULL) {
×
2794
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2795
          return terrno;
×
2796
        }
2797

2798
        pResultInfo->decompBuf = p;
×
2799
        pResultInfo->decompBufSize = payloadLen;
×
2800
      }
2801
    }
2802
  }
2803

2804
  if (payloadLen > 0) {
255,602,833✔
2805
    int32_t compLen = *(int32_t*)pRsp->data;
233,023,225✔
2806
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
233,023,225✔
2807

2808
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
233,023,225✔
2809

2810
    if (pRsp->compressed && compLen < rawLen) {
233,023,225✔
2811
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
2812
      if (len < 0) {
×
2813
        tscError("tsDecompressString failed");
×
2814
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2815
      }
2816
      if (len != rawLen) {
×
2817
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2818
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2819
      }
2820
      pResultInfo->pData = pResultInfo->decompBuf;
×
2821
      pResultInfo->payloadLen = rawLen;
×
2822
    } else {
2823
      pResultInfo->pData = pStart;
233,023,221✔
2824
      pResultInfo->payloadLen = htonl(pRsp->compLen);
233,023,221✔
2825
      if (pRsp->compLen != pRsp->payloadLen) {
233,023,225✔
2826
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2827
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2828
      }
2829
    }
2830
  }
2831

2832
  // TODO handle the compressed case
2833
  pResultInfo->totalRows += pResultInfo->numOfRows;
255,602,829✔
2834

2835
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
255,602,852✔
2836
  return code;
255,601,340✔
2837
}
2838

2839
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
185✔
2840
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
185✔
2841
  void*              clientRpc = NULL;
185✔
2842
  SServerStatusRsp   statusRsp = {0};
185✔
2843
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
185✔
2844
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
185✔
2845
  SRpcMsg  rpcRsp = {0};
185✔
2846
  SRpcInit rpcInit = {0};
185✔
2847
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
185✔
2848

2849
  rpcInit.label = "CHK";
185✔
2850
  rpcInit.numOfThreads = 1;
185✔
2851
  rpcInit.cfp = NULL;
185✔
2852
  rpcInit.sessions = 16;
185✔
2853
  rpcInit.connType = TAOS_CONN_CLIENT;
185✔
2854
  rpcInit.idleTime = tsShellActivityTimer * 1000;
185✔
2855
  rpcInit.compressSize = tsCompressMsgSize;
185✔
2856
  rpcInit.user = "_dnd";
185✔
2857

2858
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
185✔
2859
  connLimitNum = TMAX(connLimitNum, 10);
185✔
2860
  connLimitNum = TMIN(connLimitNum, 500);
185✔
2861
  rpcInit.connLimitNum = connLimitNum;
185✔
2862
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
185✔
2863
  rpcInit.readTimeout = tsReadTimeout;
185✔
2864
  rpcInit.ipv6 = tsEnableIpv6;
185✔
2865
  rpcInit.enableSSL = tsEnableTLS;
185✔
2866

2867
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
185✔
2868
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
185✔
2869
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
185✔
2870
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
185✔
2871
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
185✔
2872

2873
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
185✔
2874
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2875
    goto _OVER;
×
2876
  }
2877

2878
  clientRpc = rpcOpen(&rpcInit);
185✔
2879
  if (clientRpc == NULL) {
185✔
2880
    code = terrno;
×
2881
    tscError("failed to init server status client since %s", tstrerror(code));
×
2882
    goto _OVER;
×
2883
  }
2884

2885
  if (fqdn == NULL) {
185✔
2886
    fqdn = tsLocalFqdn;
185✔
2887
  }
2888

2889
  if (port == 0) {
185✔
2890
    port = tsServerPort;
185✔
2891
  }
2892

2893
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
185✔
2894
  epSet.eps[0].port = (uint16_t)port;
185✔
2895
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
185✔
2896
  if (TSDB_CODE_SUCCESS != ret) {
185✔
2897
    tscError("failed to send recv since %s", tstrerror(ret));
×
2898
    goto _OVER;
×
2899
  }
2900

2901
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
185✔
2902
    tscError("failed to send server status req since %s", terrstr());
43✔
2903
    goto _OVER;
43✔
2904
  }
2905

2906
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
142✔
2907
    tscError("failed to parse server status rsp since %s", terrstr());
×
2908
    goto _OVER;
×
2909
  }
2910

2911
  code = statusRsp.statusCode;
142✔
2912
  if (details != NULL) {
142✔
2913
    tstrncpy(details, statusRsp.details, maxlen);
142✔
2914
  }
2915

2916
_OVER:
171✔
2917
  if (clientRpc != NULL) {
185✔
2918
    rpcClose(clientRpc);
185✔
2919
  }
2920
  if (rpcRsp.pCont != NULL) {
185✔
2921
    rpcFreeCont(rpcRsp.pCont);
142✔
2922
  }
2923
  return code;
185✔
2924
}
2925

2926
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,186✔
2927
                      int32_t acctId, char* db) {
2928
  SName name = {0};
1,186✔
2929

2930
  if (len1 <= 0) {
1,186✔
2931
    return -1;
×
2932
  }
2933

2934
  const char* dbName = db;
1,186✔
2935
  const char* tbName = NULL;
1,186✔
2936
  int32_t     dbLen = 0;
1,186✔
2937
  int32_t     tbLen = 0;
1,186✔
2938
  if (len2 > 0) {
1,186✔
2939
    dbName = str + pos1;
×
2940
    dbLen = len1;
×
2941
    tbName = str + pos2;
×
2942
    tbLen = len2;
×
2943
  } else {
2944
    dbLen = strlen(db);
1,186✔
2945
    tbName = str + pos1;
1,186✔
2946
    tbLen = len1;
1,186✔
2947
  }
2948

2949
  if (dbLen <= 0 || tbLen <= 0) {
1,186✔
2950
    return -1;
×
2951
  }
2952

2953
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,186✔
2954
    return -1;
×
2955
  }
2956

2957
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,186✔
2958
    return -1;
×
2959
  }
2960

2961
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,186✔
2962
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,186✔
2963

2964
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,186✔
2965
  if (pDb) {
1,186✔
2966
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2967
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2968
    }
2969
  } else {
2970
    STablesReq db;
1,186✔
2971
    db.pTables = taosArrayInit(20, sizeof(SName));
1,186✔
2972
    if (NULL == db.pTables) {
1,186✔
2973
      return terrno;
×
2974
    }
2975
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,186✔
2976
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,372✔
2977
      return terrno;
×
2978
    }
2979
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,186✔
2980
  }
2981

2982
  return TSDB_CODE_SUCCESS;
1,186✔
2983
}
2984

2985
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,186✔
2986
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,186✔
2987
  if (NULL == pHash) {
1,186✔
2988
    return terrno;
×
2989
  }
2990

2991
  bool    inEscape = false;
1,186✔
2992
  int32_t code = 0;
1,186✔
2993
  void*   pIter = NULL;
1,186✔
2994

2995
  int32_t vIdx = 0;
1,186✔
2996
  int32_t vPos[2];
1,186✔
2997
  int32_t vLen[2];
1,186✔
2998

2999
  (void)memset(vPos, -1, sizeof(vPos));
1,186✔
3000
  (void)memset(vLen, 0, sizeof(vLen));
1,186✔
3001

3002
  for (int32_t i = 0;; ++i) {
5,930✔
3003
    if (0 == *(tbList + i)) {
5,930✔
3004
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,186✔
3005
        vLen[vIdx] = i - vPos[vIdx];
1,186✔
3006
      }
3007

3008
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,186✔
3009
      if (code) {
1,186✔
3010
        goto _return;
×
3011
      }
3012

3013
      break;
1,186✔
3014
    }
3015

3016
    if ('`' == *(tbList + i)) {
4,744✔
3017
      inEscape = !inEscape;
×
3018
      if (!inEscape) {
×
3019
        if (vPos[vIdx] >= 0) {
×
3020
          vLen[vIdx] = i - vPos[vIdx];
×
3021
        } else {
3022
          goto _return;
×
3023
        }
3024
      }
3025

3026
      continue;
×
3027
    }
3028

3029
    if (inEscape) {
4,744✔
3030
      if (vPos[vIdx] < 0) {
×
3031
        vPos[vIdx] = i;
×
3032
      }
3033
      continue;
×
3034
    }
3035

3036
    if ('.' == *(tbList + i)) {
4,744✔
3037
      if (vPos[vIdx] < 0) {
×
3038
        goto _return;
×
3039
      }
3040
      if (vLen[vIdx] <= 0) {
×
3041
        vLen[vIdx] = i - vPos[vIdx];
×
3042
      }
3043
      vIdx++;
×
3044
      if (vIdx >= 2) {
×
3045
        goto _return;
×
3046
      }
3047
      continue;
×
3048
    }
3049

3050
    if (',' == *(tbList + i)) {
4,744✔
3051
      if (vPos[vIdx] < 0) {
×
3052
        goto _return;
×
3053
      }
3054
      if (vLen[vIdx] <= 0) {
×
3055
        vLen[vIdx] = i - vPos[vIdx];
×
3056
      }
3057

3058
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3059
      if (code) {
×
3060
        goto _return;
×
3061
      }
3062

3063
      (void)memset(vPos, -1, sizeof(vPos));
×
3064
      (void)memset(vLen, 0, sizeof(vLen));
×
3065
      vIdx = 0;
×
3066
      continue;
×
3067
    }
3068

3069
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
4,744✔
3070
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3071
        vLen[vIdx] = i - vPos[vIdx];
×
3072
      }
3073
      continue;
×
3074
    }
3075

3076
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
4,744✔
3077
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
593✔
3078
      if (vLen[vIdx] > 0) {
4,744✔
3079
        goto _return;
×
3080
      }
3081
      if (vPos[vIdx] < 0) {
4,744✔
3082
        vPos[vIdx] = i;
1,186✔
3083
      }
3084
      continue;
4,744✔
3085
    }
3086

3087
    goto _return;
×
3088
  }
3089

3090
  int32_t dbNum = taosHashGetSize(pHash);
1,186✔
3091
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,186✔
3092
  if (NULL == pReq) {
1,186✔
3093
    TSC_ERR_JRET(terrno);
×
3094
  }
3095
  pIter = taosHashIterate(pHash, NULL);
1,186✔
3096
  while (pIter) {
2,372✔
3097
    STablesReq* pDb = (STablesReq*)pIter;
1,186✔
3098
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,372✔
3099
      TSC_ERR_JRET(terrno);
×
3100
    }
3101
    pIter = taosHashIterate(pHash, pIter);
1,186✔
3102
  }
3103

3104
  taosHashCleanup(pHash);
1,186✔
3105

3106
  return TSDB_CODE_SUCCESS;
1,186✔
3107

3108
_return:
×
3109

3110
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3111

3112
  pIter = taosHashIterate(pHash, NULL);
×
3113
  while (pIter) {
×
3114
    STablesReq* pDb = (STablesReq*)pIter;
×
3115
    taosArrayDestroy(pDb->pTables);
×
3116
    pIter = taosHashIterate(pHash, pIter);
×
3117
  }
3118

3119
  taosHashCleanup(pHash);
×
3120

3121
  return terrno;
×
3122
}
3123

3124
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,186✔
3125
  SSyncQueryParam* pParam = param;
1,186✔
3126
  pParam->pRequest->code = code;
1,186✔
3127

3128
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,186✔
3129
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3130
  }
3131
}
1,186✔
3132

3133
void syncQueryFn(void* param, void* res, int32_t code) {
913,692,342✔
3134
  SSyncQueryParam* pParam = param;
913,692,342✔
3135
  pParam->pRequest = res;
913,692,342✔
3136

3137
  if (pParam->pRequest) {
913,697,487✔
3138
    pParam->pRequest->code = code;
913,711,897✔
3139
    clientOperateReport(pParam->pRequest);
913,722,690✔
3140
  }
3141

3142
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
913,662,707✔
3143
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3144
  }
3145
}
913,749,886✔
3146

3147
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
913,366,982✔
3148
                        int8_t source) {
3149
  if (sql == NULL || NULL == fp) {
913,366,982✔
3150
    terrno = TSDB_CODE_INVALID_PARA;
608✔
3151
    if (fp) {
×
3152
      fp(param, NULL, terrno);
×
3153
    }
3154

3155
    return;
188✔
3156
  }
3157

3158
  size_t sqlLen = strlen(sql);
913,369,408✔
3159
  if (sqlLen > (size_t)tsMaxSQLLength) {
913,369,408✔
3160
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
492✔
3161
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
492✔
3162
    fp(param, NULL, terrno);
492✔
3163
    return;
492✔
3164
  }
3165

3166
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
913,368,916✔
3167

3168
  SRequestObj* pRequest = NULL;
913,368,916✔
3169
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
913,375,676✔
3170
  if (code != TSDB_CODE_SUCCESS) {
913,371,935✔
3171
    terrno = code;
1,328✔
3172
    fp(param, NULL, terrno);
1,328✔
3173
    return;
1,328✔
3174
  }
3175

3176
  code = connCheckAndUpateMetric(connId);
913,370,607✔
3177
  if (code != TSDB_CODE_SUCCESS) {
913,368,802✔
3178
    terrno = code;
462✔
3179
    fp(param, NULL, terrno);
462✔
3180
    return;
462✔
3181
  }
3182

3183
  pRequest->source = source;
913,368,340✔
3184
  pRequest->body.queryFp = fp;
913,370,218✔
3185
  doAsyncQuery(pRequest, false);
913,366,851✔
3186
}
3187

3188
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
205✔
3189
                                 int64_t reqid) {
3190
  if (sql == NULL || NULL == fp) {
205✔
3191
    terrno = TSDB_CODE_INVALID_PARA;
×
3192
    if (fp) {
×
3193
      fp(param, NULL, terrno);
×
3194
    }
3195

3196
    return;
×
3197
  }
3198

3199
  size_t sqlLen = strlen(sql);
205✔
3200
  if (sqlLen > (size_t)tsMaxSQLLength) {
205✔
3201
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
3202
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3203
    fp(param, NULL, terrno);
×
3204
    return;
×
3205
  }
3206

3207
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
205✔
3208

3209
  SRequestObj* pRequest = NULL;
205✔
3210
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
205✔
3211
  if (code != TSDB_CODE_SUCCESS) {
205✔
3212
    terrno = code;
×
3213
    fp(param, NULL, terrno);
×
3214
    return;
×
3215
  }
3216

3217
  code = connCheckAndUpateMetric(connId);
205✔
3218

3219
  if (code != TSDB_CODE_SUCCESS) {
205✔
3220
    terrno = code;
×
3221
    fp(param, NULL, terrno);
×
3222
    return;
×
3223
  }
3224

3225
  pRequest->body.queryFp = fp;
205✔
3226

3227
  doAsyncQuery(pRequest, false);
205✔
3228
}
3229

3230
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
913,239,046✔
3231
  if (NULL == taos) {
913,239,046✔
3232
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3233
    return NULL;
×
3234
  }
3235

3236
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
913,239,046✔
3237
  if (NULL == param) {
913,247,636✔
3238
    return NULL;
×
3239
  }
3240

3241
  int32_t code = tsem_init(&param->sem, 0, 0);
913,247,636✔
3242
  if (TSDB_CODE_SUCCESS != code) {
913,239,728✔
3243
    taosMemoryFree(param);
×
3244
    return NULL;
×
3245
  }
3246

3247
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
913,239,728✔
3248
  code = tsem_wait(&param->sem);
913,217,686✔
3249
  if (TSDB_CODE_SUCCESS != code) {
913,228,099✔
3250
    taosMemoryFree(param);
×
3251
    return NULL;
×
3252
  }
3253
  code = tsem_destroy(&param->sem);
913,228,099✔
3254
  if (TSDB_CODE_SUCCESS != code) {
913,229,881✔
3255
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3256
  }
3257

3258
  SRequestObj* pRequest = NULL;
913,225,147✔
3259
  if (param->pRequest != NULL) {
913,225,147✔
3260
    param->pRequest->syncQuery = true;
913,223,204✔
3261
    pRequest = param->pRequest;
913,224,516✔
3262
    param->pRequest->inCallback = false;
913,224,532✔
3263
  }
3264
  taosMemoryFree(param);
913,225,006✔
3265

3266
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3267
  //          *(int64_t*)taos, pRequest);
3268

3269
  return pRequest;
913,240,065✔
3270
}
3271

3272
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
205✔
3273
  if (NULL == taos) {
205✔
3274
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3275
    return NULL;
×
3276
  }
3277

3278
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
205✔
3279
  if (param == NULL) {
205✔
3280
    return NULL;
×
3281
  }
3282
  int32_t code = tsem_init(&param->sem, 0, 0);
205✔
3283
  if (TSDB_CODE_SUCCESS != code) {
205✔
3284
    taosMemoryFree(param);
×
3285
    return NULL;
×
3286
  }
3287

3288
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
205✔
3289
  code = tsem_wait(&param->sem);
205✔
3290
  if (TSDB_CODE_SUCCESS != code) {
205✔
3291
    taosMemoryFree(param);
×
3292
    return NULL;
×
3293
  }
3294
  SRequestObj* pRequest = NULL;
205✔
3295
  if (param->pRequest != NULL) {
205✔
3296
    param->pRequest->syncQuery = true;
205✔
3297
    pRequest = param->pRequest;
205✔
3298
  }
3299
  taosMemoryFree(param);
205✔
3300

3301
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3302
  //   *(int64_t*)taos, pRequest);
3303

3304
  return pRequest;
205✔
3305
}
3306

3307
static void fetchCallback(void* pResult, void* param, int32_t code) {
252,388,049✔
3308
  SRequestObj* pRequest = (SRequestObj*)param;
252,388,049✔
3309

3310
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
252,388,049✔
3311

3312
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
252,388,049✔
3313
           tstrerror(code), pRequest->requestId);
3314

3315
  pResultInfo->pData = pResult;
252,388,010✔
3316
  pResultInfo->numOfRows = 0;
252,388,006✔
3317

3318
  if (code != TSDB_CODE_SUCCESS) {
252,387,983✔
3319
    pRequest->code = code;
×
3320
    taosMemoryFreeClear(pResultInfo->pData);
×
3321
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3322
    return;
×
3323
  }
3324

3325
  if (pRequest->code != TSDB_CODE_SUCCESS) {
252,387,983✔
3326
    taosMemoryFreeClear(pResultInfo->pData);
×
3327
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3328
    return;
×
3329
  }
3330

3331
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
256,985,661✔
3332
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
252,388,010✔
3333
  if (pRequest->code != TSDB_CODE_SUCCESS) {
252,386,554✔
3334
    pResultInfo->numOfRows = 0;
63✔
3335
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
63✔
3336
             tstrerror(pRequest->code), pRequest->requestId);
3337
  } else {
3338
    tscDebug(
252,386,474✔
3339
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3340
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3341

3342
    STscObj*            pTscObj = pRequest->pTscObj;
252,386,474✔
3343
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
252,387,986✔
3344
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
252,387,986✔
3345
  }
3346

3347
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
252,388,047✔
3348
}
3349

3350
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
283,459,079✔
3351
  pRequest->body.fetchFp = fp;
283,459,079✔
3352
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
283,459,079✔
3353

3354
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
283,459,079✔
3355

3356
  // this query has no results or error exists, return directly
3357
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
283,459,079✔
3358
    pResultInfo->numOfRows = 0;
×
3359
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3360
    return;
2,002,296✔
3361
  }
3362

3363
  // all data has returned to App already, no need to try again
3364
  if (pResultInfo->completed) {
283,459,079✔
3365
    // it is a local executed query, no need to do async fetch
3366
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
31,071,030✔
3367
      if (pResultInfo->localResultFetched) {
1,382,308✔
3368
        pResultInfo->numOfRows = 0;
691,154✔
3369
        pResultInfo->current = 0;
691,154✔
3370
      } else {
3371
        pResultInfo->localResultFetched = true;
691,154✔
3372
      }
3373
    } else {
3374
      pResultInfo->numOfRows = 0;
29,688,722✔
3375
    }
3376

3377
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
31,071,030✔
3378
    return;
31,071,030✔
3379
  }
3380

3381
  SSchedulerReq req = {
252,388,049✔
3382
      .syncReq = false,
3383
      .fetchFp = fetchCallback,
3384
      .cbParam = pRequest,
3385
  };
3386

3387
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
252,388,049✔
3388
  if (TSDB_CODE_SUCCESS != code) {
252,387,983✔
3389
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3390
    // pRequest->body.fetchFp(param, pRequest, code);
3391
  }
3392
}
3393

3394
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
913,703,647✔
3395
  pRequest->inCallback = true;
913,703,647✔
3396
  int64_t this = pRequest->self;
913,713,126✔
3397
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
913,693,929✔
3398
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
18,648✔
3399
    code = TSDB_CODE_SUCCESS;
×
3400
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3401
  }
3402

3403
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
913,693,929✔
3404
           pRequest);
3405

3406
  if (pRequest->body.queryFp != NULL) {
913,694,396✔
3407
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
913,721,382✔
3408
  }
3409

3410
  SRequestObj* pReq = acquireRequest(this);
913,759,125✔
3411
  if (pReq != NULL) {
913,778,952✔
3412
    pReq->inCallback = false;
912,051,592✔
3413
    (void)releaseRequest(this);
912,054,478✔
3414
  }
3415
}
913,769,178✔
3416

3417
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
503,802✔
3418
                       SParseSqlRes* pRes) {
3419
#ifndef TD_ENTERPRISE
3420
  return TSDB_CODE_SUCCESS;
3421
#else
3422
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
503,802✔
3423
#endif
3424
}
3425

3426
void updateConnAccessInfo(SConnAccessInfo* pInfo) {
996,252,272✔
3427
  if (pInfo == NULL) {
996,252,272✔
3428
    return;
×
3429
  }
3430
  int64_t ts = taosGetTimestampMs();
996,271,852✔
3431
  if (pInfo->startTime == 0) {
996,271,852✔
3432
    pInfo->startTime = ts;
82,904,580✔
3433
  }
3434
  pInfo->lastAccessTime = ts;
996,268,404✔
3435
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc