• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4935

22 Jan 2026 06:38AM UTC coverage: 66.708% (+0.02%) from 66.691%
#4935

push

travis-ci

web-flow
merge: from main to 3.0 #34371

121 of 271 new or added lines in 17 files covered. (44.65%)

9066 existing lines in 149 files now uncovered.

203884 of 305637 relevant lines covered (66.71%)

125811266.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.8
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "command.h"
22
#include "decimal.h"
23
#include "scheduler.h"
24
#include "tdatablock.h"
25
#include "tdataformat.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tmisce.h"
29
#include "tmsg.h"
30
#include "tmsgtype.h"
31
#include "tpagedbuf.h"
32
#include "tref.h"
33
#include "tsched.h"
34
#include "tversion.h"
35

36
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
37
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
38

39
void setQueryRequest(int64_t rId) {
418,336,117✔
40
  SRequestObj* pReq = acquireRequest(rId);
418,336,117✔
41
  if (pReq != NULL) {
418,337,286✔
42
    pReq->isQuery = true;
418,324,906✔
43
    (void)releaseRequest(rId);
418,324,865✔
44
  }
45
}
418,337,140✔
46

47
static bool stringLengthCheck(const char* str, size_t maxsize) {
5,249,885✔
48
  if (str == NULL) {
5,249,885✔
49
    return false;
×
50
  }
51

52
  size_t len = strlen(str);
5,249,885✔
53
  if (len <= 0 || len > maxsize) {
5,249,885✔
54
    return false;
×
55
  }
56

57
  return true;
5,251,485✔
58
}
59

60
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
2,376,384✔
61

62
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
2,377,470✔
63

64
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
496,043✔
65

66
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
2,379,424✔
67
  char key[512] = {0};
2,379,424✔
68
  if (user == NULL) {
2,379,469✔
69
    (void)snprintf(key, sizeof(key), "%s:%s:%d", auth, ip, port);
2,663✔
70
  } else {
71
    (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
2,376,806✔
72
  }
73
  return taosStrdup(key);
2,379,469✔
74
}
75

76
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
525,862✔
77
  if (dst == NULL || src == NULL || srcLength == 0) {
525,862✔
78
    return 0;
442✔
79
  }
80

81
  size_t escapeLength = 0;
525,420✔
82
  for (size_t i = 0; i < srcLength; ++i) {
14,961,840✔
83
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
14,436,420✔
84
        src[i] == '\t') {
14,436,420✔
85
      escapeLength += 1;
×
86
    }
87
  }
88

89
  size_t dstLength = srcLength;
525,420✔
90
  if (escapeLength == 0) {
525,420✔
91
    (void)memcpy(dst, src, srcLength);
525,420✔
92
  } else {
93
    dstLength = 0;
×
94
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
95
      switch (src[i]) {
×
96
        case '\"':
×
97
          dst[dstLength++] = '\\';
×
98
          dst[dstLength++] = '\"';
×
99
          break;
×
100
        case '\\':
×
101
          dst[dstLength++] = '\\';
×
102
          dst[dstLength++] = '\\';
×
103
          break;
×
104
        case '\b':
×
105
          dst[dstLength++] = '\\';
×
106
          dst[dstLength++] = 'b';
×
107
          break;
×
108
        case '\f':
×
109
          dst[dstLength++] = '\\';
×
110
          dst[dstLength++] = 'f';
×
111
          break;
×
112
        case '\n':
×
113
          dst[dstLength++] = '\\';
×
114
          dst[dstLength++] = 'n';
×
115
          break;
×
116
        case '\r':
×
117
          dst[dstLength++] = '\\';
×
118
          dst[dstLength++] = 'r';
×
119
          break;
×
120
        case '\t':
×
121
          dst[dstLength++] = '\\';
×
122
          dst[dstLength++] = 't';
×
123
          break;
×
124
        default:
×
125
          dst[dstLength++] = src[i];
×
126
      }
127
    }
128
  }
129

130
  return dstLength;
525,420✔
131
}
132

133
bool chkRequestKilled(void* param) {
2,147,483,647✔
134
  bool         killed = false;
2,147,483,647✔
135
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
136
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
137
    killed = true;
63✔
138
  }
139

140
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
141

142
  return killed;
2,147,483,647✔
143
}
144

145
void cleanupAppInfo() {
1,232,348✔
146
  taosHashCleanup(appInfo.pInstMap);
1,232,348✔
147
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,232,348✔
148
  tscInfo("cluster instance map cleaned");
1,232,348✔
149
}
1,232,348✔
150

151
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
152
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
153
                               STscObj** pTscObj);
154

155
int32_t taos_connect_by_auth(const char* ip, const char* user, const char* auth, const char* totp,
2,380,479✔
156
                                    const char* db, uint16_t port, int connType, STscObj** pObj) {
157
  TSC_ERR_RET(taos_init());
2,380,479✔
158

159
  if (user == NULL) {
2,380,064✔
160
    if (auth == NULL || strlen(auth) != (TSDB_TOKEN_LEN - 1)) {
3,377✔
161
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOKEN);
714✔
162
    }
163
  } else if (!validateUserName(user)) {
2,376,687✔
164
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
165
  }
166
  int32_t code = 0;
2,379,341✔
167

168
  char localDb[TSDB_DB_NAME_LEN] = {0};
2,379,341✔
169
  if (db != NULL && strlen(db) > 0) {
2,379,371✔
170
    if (!validateDbName(db)) {
496,131✔
171
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
172
    }
173

174
    tstrncpy(localDb, db, sizeof(localDb));
496,799✔
175
    (void)strdequote(localDb);
496,799✔
176
  }
177

178
  int32_t totpCode = -1;
2,379,030✔
179
  if (totp != NULL) {
2,379,030✔
180
    char* endptr = NULL;
1,848✔
181
    totpCode = taosStr2Int32(totp, &endptr, 10);
1,848✔
182
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
1,848✔
183
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
184
    }
185
  }
186

187
  SCorEpSet epSet = {0};
2,379,030✔
188
  if (ip) {
2,379,765✔
189
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
837,844✔
190
  } else {
191
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
1,541,921✔
192
  }
193

194
  if (port) {
2,378,998✔
195
    epSet.epSet.eps[0].port = port;
189,875✔
196
    epSet.epSet.eps[1].port = port;
189,875✔
197
  }
198

199
  char* key = getClusterKey(user, auth, ip, port);
2,378,998✔
200
  if (NULL == key) {
2,379,721✔
201
    TSC_ERR_RET(terrno);
×
202
  }
203
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
2,379,721✔
204
          user ? user : "", db, key);
205
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
6,302,810✔
206
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
3,922,529✔
207
  }
208

209
  SAppInstInfo** pInst = NULL;
2,380,281✔
210
  code = taosThreadMutexLock(&appInfo.mutex);
2,380,281✔
211
  if (TSDB_CODE_SUCCESS != code) {
2,380,281✔
212
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
213
    TSC_ERR_RET(code);
×
214
  }
215

216
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
2,380,281✔
217
  SAppInstInfo* p = NULL;
2,380,281✔
218
  if (pInst == NULL) {
2,380,281✔
219
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,294,782✔
220
    if (NULL == p) {
1,294,782✔
221
      TSC_ERR_JRET(terrno);
×
222
    }
223
    p->mgmtEp = epSet;
1,294,782✔
224
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,294,782✔
225
    if (TSDB_CODE_SUCCESS != code) {
1,294,782✔
226
      taosMemoryFree(p);
×
227
      TSC_ERR_JRET(code);
×
228
    }
229
    code = openTransporter(user, auth, tsNumOfCores / 2, &p->pTransporter);
1,294,782✔
230
    if (TSDB_CODE_SUCCESS != code) {
1,294,782✔
231
      taosMemoryFree(p);
×
232
      TSC_ERR_JRET(code);
×
233
    }
234
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,294,782✔
235
    if (TSDB_CODE_SUCCESS != code) {
1,294,782✔
236
      destroyAppInst(&p);
×
237
      TSC_ERR_JRET(code);
×
238
    }
239
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,294,782✔
240
    if (TSDB_CODE_SUCCESS != code) {
1,294,782✔
241
      destroyAppInst(&p);
×
242
      TSC_ERR_JRET(code);
×
243
    }
244
    p->instKey = key;
1,294,782✔
245
    key = NULL;
1,294,782✔
246
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user ? user : "", epSet.epSet.eps[0].fqdn,
1,294,782✔
247
            epSet.epSet.eps[0].port);
248

249
    pInst = &p;
1,294,782✔
250
  } else {
251
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
1,085,499✔
252
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
253
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
254
    }
255
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
256
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
1,085,499✔
257
  }
258

259
_return:
2,380,281✔
260

261
  if (TSDB_CODE_SUCCESS != code) {
2,380,281✔
262
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
263
    taosMemoryFreeClear(key);
×
264
    return code;
×
265
  } else {
266
    code = taosThreadMutexUnlock(&appInfo.mutex);
2,380,281✔
267
    taosMemoryFreeClear(key);
2,380,281✔
268
    if (TSDB_CODE_SUCCESS != code) {
2,380,281✔
269
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
270
      return code;
×
271
    }
272
    return taosConnectImpl(user, auth, totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
2,380,281✔
273
  }
274
}
275

276
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
2,377,482✔
277
                              uint16_t port, int connType, STscObj** pObj) {
278
  char auth[TSDB_PASSWORD_LEN + 1] = {0};
2,377,482✔
279
  if (!validatePassword(pass)) {
2,377,482✔
280
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
281
  }
282

283
  taosEncryptPass_c((uint8_t*)pass, strlen(pass), auth);
2,377,562✔
284
  return taos_connect_by_auth(ip, user, auth, totp, db, port, connType, pObj);
2,377,439✔
285
}
286

287
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
288
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
289
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
290
//     return *ppAppInstInfo;
291
//   } else {
292
//     return NULL;
293
//   }
294
// }
295

296
void freeQueryParam(SSyncQueryParam* param) {
85,942✔
297
  if (param == NULL) return;
85,942✔
298
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
85,942✔
299
    tscError("failed to destroy semaphore in freeQueryParam");
×
300
  }
301
  taosMemoryFree(param);
85,942✔
302
}
303

304
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
809,580,737✔
305
                     SRequestObj** pRequest, int64_t reqid) {
306
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
809,580,737✔
307
  if (TSDB_CODE_SUCCESS != code) {
809,581,715✔
308
    tscError("failed to malloc sqlObj, %s", sql);
×
309
    return code;
×
310
  }
311

312
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
809,581,715✔
313
  if ((*pRequest)->sqlstr == NULL) {
809,579,519✔
314
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
315
    destroyRequest(*pRequest);
×
316
    *pRequest = NULL;
×
317
    return terrno;
×
318
  }
319

320
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
809,582,214✔
321
  (*pRequest)->sqlstr[sqlLen] = 0;
809,590,008✔
322
  (*pRequest)->sqlLen = sqlLen;
809,590,825✔
323
  (*pRequest)->validateOnly = validateSql;
809,590,233✔
324
  (*pRequest)->stmtBindVersion = 0;
809,590,118✔
325

326
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
809,590,428✔
327

328
  STscObj* pTscObj = (*pRequest)->pTscObj;
809,589,147✔
329
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
809,589,903✔
330
                             sizeof((*pRequest)->self));
331
  if (err) {
809,589,212✔
332
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
333
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
334
    destroyRequest(*pRequest);
×
335
    *pRequest = NULL;
×
336
    return terrno;
×
337
  }
338

339
  (*pRequest)->allocatorRefId = -1;
809,589,212✔
340
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
809,586,204✔
341
    if (TSDB_CODE_SUCCESS !=
258,384,723✔
342
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
258,378,118✔
343
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
344
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
345
      destroyRequest(*pRequest);
×
346
      *pRequest = NULL;
×
347
      return terrno;
×
348
    }
349
  }
350

351
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
809,594,091✔
352
  return TSDB_CODE_SUCCESS;
809,580,929✔
353
}
354

355
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
356
  int32_t code =
357
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
358
  if (TSDB_CODE_SUCCESS == code) {
×
359
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
360
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
361
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
362
    (*pNewRequest)->isSubReq = true;
×
363
  }
364
  return code;
×
365
}
366

367
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
11,908,231✔
368
  STscObj* pTscObj = pRequest->pTscObj;
11,908,231✔
369

370
  SParseContext cxt = {
11,912,917✔
371
      .requestId = pRequest->requestId,
11,911,618✔
372
      .requestRid = pRequest->self,
11,911,958✔
373
      .acctId = pTscObj->acctId,
11,913,510✔
374
      .db = pRequest->pDb,
11,913,636✔
375
      .topicQuery = topicQuery,
376
      .pSql = pRequest->sqlstr,
11,911,835✔
377
      .sqlLen = pRequest->sqlLen,
11,914,470✔
378
      .pMsg = pRequest->msgBuf,
11,913,783✔
379
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
380
      .pTransporter = pTscObj->pAppInfo->pTransporter,
11,911,719✔
381
      .pStmtCb = pStmtCb,
382
      .pUser = pTscObj->user,
11,909,356✔
383
      .userId = pTscObj->userId,
11,906,996✔
384
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
11,908,258✔
385
      .enableSysInfo = pTscObj->sysInfo,
11,908,249✔
386
      .svrVer = pTscObj->sVer,
11,908,475✔
387
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
11,907,162✔
388
      .stmtBindVersion = pRequest->stmtBindVersion,
11,914,251✔
389
      .setQueryFp = setQueryRequest,
390
      .timezone = pTscObj->optionInfo.timezone,
11,906,531✔
391
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
11,908,771✔
392
  };
393

394
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
11,910,387✔
395
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
11,914,910✔
396
  if (code != TSDB_CODE_SUCCESS) {
11,906,897✔
397
    return code;
×
398
  }
399

400
  code = qParseSql(&cxt, pQuery);
11,906,897✔
401
  if (TSDB_CODE_SUCCESS == code) {
11,892,658✔
402
    if ((*pQuery)->haveResultSet) {
11,902,251✔
403
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
404
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
405
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
406
    }
407
  }
408

409
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
11,894,370✔
410
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
11,892,676✔
411
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
11,897,302✔
412
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
11,896,686✔
413
  }
414

415
  taosArrayDestroy(cxt.pTableMetaPos);
11,897,178✔
416
  taosArrayDestroy(cxt.pTableVgroupPos);
11,878,849✔
417

418
  return code;
11,888,914✔
419
}
420

421
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
422
  SRetrieveTableRsp* pRsp = NULL;
×
423
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
424
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
425
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
426
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
427
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
428
                                 pRequest->stmtBindVersion > 0);
×
429
  }
430

431
  return code;
×
432
}
433

434
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
385,867✔
435
  // drop table if exists not_exists_table
436
  if (NULL == pQuery->pCmdMsg) {
385,867✔
437
    return TSDB_CODE_SUCCESS;
×
438
  }
439

440
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
385,867✔
441
  pRequest->type = pMsgInfo->msgType;
385,867✔
442
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
385,867✔
443
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
385,867✔
444

445
  STscObj*      pTscObj = pRequest->pTscObj;
385,867✔
446
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
385,867✔
447

448
  // int64_t transporterId = 0;
449
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
385,867✔
450
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
385,867✔
451
  return TSDB_CODE_SUCCESS;
385,867✔
452
}
453

454
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
1,439,475,781✔
455

456
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
5,127,095✔
457
  SRetrieveTableRsp* pRsp = NULL;
5,127,095✔
458
  if (pRequest->validateOnly) {
5,127,095✔
459
    doRequestCallback(pRequest, 0);
×
460
    return;
×
461
  }
462

463
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
10,242,919✔
464
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
10,242,919✔
465
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
5,127,095✔
466
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
2,946,537✔
467
                                 pRequest->stmtBindVersion > 0);
2,946,537✔
468
  }
469

470
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
5,127,095✔
471
  pRequest->code = code;
5,127,095✔
472

473
  if (pRequest->code != TSDB_CODE_SUCCESS) {
5,127,095✔
474
    pResultInfo->numOfRows = 0;
1,600✔
475
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
1,600✔
476
             pRequest->requestId);
477
  } else {
478
    tscDebug(
5,125,495✔
479
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
480
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
481
  }
482

483
  doRequestCallback(pRequest, code);
5,127,095✔
484
}
485

486
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
14,332,681✔
487
  if (pRequest->validateOnly) {
14,332,681✔
488
    doRequestCallback(pRequest, 0);
×
489
    return TSDB_CODE_SUCCESS;
×
490
  }
491

492
  // drop table if exists not_exists_table
493
  if (NULL == pQuery->pCmdMsg) {
14,332,681✔
494
    doRequestCallback(pRequest, 0);
6,148✔
495
    return TSDB_CODE_SUCCESS;
6,148✔
496
  }
497

498
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
14,326,537✔
499
  pRequest->type = pMsgInfo->msgType;
14,326,533✔
500
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
14,326,563✔
501
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
14,326,567✔
502

503
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
14,326,483✔
504
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
14,326,533✔
505

506
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
14,326,573✔
507
  if (code) {
14,326,579✔
508
    doRequestCallback(pRequest, code);
×
509
  }
510
  return code;
14,326,579✔
511
}
512

513
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
130,244✔
514
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
130,244✔
515
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
130,244✔
516

517
  if (node1->load < node2->load) {
130,244✔
518
    return -1;
×
519
  }
520

521
  return node1->load > node2->load;
130,244✔
522
}
523

524
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
202,702✔
525
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
202,702✔
526
  if (pInfo->pQnodeList) {
202,702✔
527
    taosArrayDestroy(pInfo->pQnodeList);
199,824✔
528
    pInfo->pQnodeList = NULL;
199,824✔
529
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
199,824✔
530
  }
531

532
  if (pNodeList) {
202,702✔
533
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
202,702✔
534
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
202,702✔
535
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
202,702✔
536
             taosArrayGetSize(pInfo->pQnodeList));
537
  }
538
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
202,702✔
539

540
  return TSDB_CODE_SUCCESS;
202,702✔
541
}
542

543
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
798,389,883✔
544
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
798,389,883✔
545
    *required = false;
786,697,609✔
546
    return TSDB_CODE_SUCCESS;
786,692,930✔
547
  }
548

549
  int32_t       code = TSDB_CODE_SUCCESS;
11,692,274✔
550
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
11,692,274✔
551
  *required = false;
11,691,792✔
552

553
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
11,692,756✔
554
  *required = (NULL == pInfo->pQnodeList);
11,692,756✔
555
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
11,693,720✔
556
  return TSDB_CODE_SUCCESS;
11,693,238✔
557
}
558

559
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
560
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
561
  int32_t       code = 0;
×
562

563
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
564
  if (pInfo->pQnodeList) {
×
565
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
566
  }
567
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
568
  if (NULL == *pNodeList) {
×
569
    SCatalog* pCatalog = NULL;
×
570
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
571
    if (TSDB_CODE_SUCCESS == code) {
×
572
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
573
      if (NULL == pNodeList) {
×
574
        TSC_ERR_RET(terrno);
×
575
      }
576
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
577
                               .requestId = pRequest->requestId,
×
578
                               .requestObjRefId = pRequest->self,
×
579
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
580
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
581
    }
582

583
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
584
      code = updateQnodeList(pInfo, *pNodeList);
×
585
    }
586
  }
587

588
  return code;
×
589
}
590

591
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
13,103,176✔
592
  pRequest->type = pQuery->msgType;
13,103,176✔
593
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
13,102,789✔
594

595
  SPlanContext cxt = {.queryId = pRequest->requestId,
13,419,281✔
596
                      .acctId = pRequest->pTscObj->acctId,
13,100,897✔
597
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
13,098,985✔
598
                      .pAstRoot = pQuery->pRoot,
13,112,937✔
599
                      .showRewrite = pQuery->showRewrite,
13,114,738✔
600
                      .pMsg = pRequest->msgBuf,
13,111,160✔
601
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
602
                      .pUser = pRequest->pTscObj->user,
13,104,961✔
603
                      .userId = pRequest->pTscObj->userId,
13,095,464✔
604
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
13,104,038✔
605
                      .sysInfo = pRequest->pTscObj->sysInfo};
13,104,722✔
606

607
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
13,091,346✔
608
}
609

610
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
240,787,867✔
611
                         const SExtSchema* pExtSchema, bool isStmt) {
612
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
240,787,867✔
613
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
614
    return TSDB_CODE_INVALID_PARA;
×
615
  }
616

617
  pResInfo->numOfCols = numOfCols;
240,798,773✔
618
  if (pResInfo->fields != NULL) {
240,796,314✔
619
    taosMemoryFree(pResInfo->fields);
13,360✔
620
  }
621
  if (pResInfo->userFields != NULL) {
240,794,739✔
622
    taosMemoryFree(pResInfo->userFields);
13,360✔
623
  }
624
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
240,777,328✔
625
  if (NULL == pResInfo->fields) return terrno;
240,786,789✔
626
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
240,783,696✔
627
  if (NULL == pResInfo->userFields) {
240,792,081✔
628
    taosMemoryFree(pResInfo->fields);
×
629
    return terrno;
×
630
  }
631
  if (numOfCols != pResInfo->numOfCols) {
240,795,202✔
632
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
633
    return TSDB_CODE_FAILED;
×
634
  }
635

636
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
1,348,995,433✔
637
    pResInfo->fields[i].type = pSchema[i].type;
1,108,195,864✔
638

639
    pResInfo->userFields[i].type = pSchema[i].type;
1,108,207,502✔
640
    // userFields must convert to type bytes, no matter isStmt or not
641
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
1,108,214,942✔
642
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
1,108,195,850✔
643
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
1,108,198,762✔
644
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,273,284✔
645
    }
646

647
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
1,108,192,691✔
648
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
1,108,209,539✔
649
  }
650
  return TSDB_CODE_SUCCESS;
240,801,598✔
651
}
652

653
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
135,983,992✔
654
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
135,983,992✔
655
      precision != TSDB_TIME_PRECISION_NANO) {
656
    return;
×
657
  }
658

659
  pResInfo->precision = precision;
135,983,992✔
660
}
661

662
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
145,535,688✔
663
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
145,535,688✔
664
  if (NULL == nodeList) {
145,542,907✔
665
    return terrno;
×
666
  }
667
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
145,544,057✔
668

669
  int32_t dbNum = taosArrayGetSize(pDbVgList);
145,544,057✔
670
  for (int32_t i = 0; i < dbNum; ++i) {
289,033,262✔
671
    SArray* pVg = taosArrayGetP(pDbVgList, i);
143,467,158✔
672
    if (NULL == pVg) {
143,476,466✔
673
      continue;
×
674
    }
675
    int32_t vgNum = taosArrayGetSize(pVg);
143,476,466✔
676
    if (vgNum <= 0) {
143,477,599✔
677
      continue;
518,393✔
678
    }
679

680
    for (int32_t j = 0; j < vgNum; ++j) {
553,852,400✔
681
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
410,893,456✔
682
      if (NULL == pInfo) {
410,929,687✔
683
        taosArrayDestroy(nodeList);
×
684
        return TSDB_CODE_OUT_OF_RANGE;
×
685
      }
686
      SQueryNodeLoad load = {0};
410,929,687✔
687
      load.addr.nodeId = pInfo->vgId;
410,911,019✔
688
      load.addr.epSet = pInfo->epSet;
410,948,234✔
689

690
      if (NULL == taosArrayPush(nodeList, &load)) {
410,772,639✔
691
        taosArrayDestroy(nodeList);
×
692
        return terrno;
×
693
      }
694
    }
695
  }
696

697
  int32_t vnodeNum = taosArrayGetSize(nodeList);
145,566,104✔
698
  if (vnodeNum > 0) {
145,567,578✔
699
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
142,715,850✔
700
    goto _return;
142,715,766✔
701
  }
702

703
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
2,851,728✔
704
  if (mnodeNum <= 0) {
2,847,021✔
705
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
706
    goto _return;
×
707
  }
708

709
  void* pData = taosArrayGet(pMnodeList, 0);
2,847,021✔
710
  if (NULL == pData) {
2,847,033✔
711
    taosArrayDestroy(nodeList);
×
712
    return TSDB_CODE_OUT_OF_RANGE;
×
713
  }
714
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
2,847,033✔
715
    taosArrayDestroy(nodeList);
×
716
    return terrno;
×
717
  }
718

719
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
2,847,033✔
720

721
_return:
63,860✔
722

723
  *pNodeList = nodeList;
145,562,682✔
724

725
  return TSDB_CODE_SUCCESS;
145,555,411✔
726
}
727

728
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
1,477,186✔
729
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,477,186✔
730
  if (NULL == nodeList) {
1,477,186✔
731
    return terrno;
×
732
  }
733

734
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
1,477,186✔
735
  if (qNodeNum > 0) {
1,477,186✔
736
    void* pData = taosArrayGet(pQnodeList, 0);
1,457,086✔
737
    if (NULL == pData) {
1,457,086✔
738
      taosArrayDestroy(nodeList);
×
739
      return TSDB_CODE_OUT_OF_RANGE;
×
740
    }
741
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
1,457,086✔
742
      taosArrayDestroy(nodeList);
×
743
      return terrno;
×
744
    }
745
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
1,457,086✔
746
    goto _return;
1,457,086✔
747
  }
748

749
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
20,100✔
750
  if (mnodeNum <= 0) {
20,100✔
751
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
×
752
    goto _return;
×
753
  }
754

755
  void* pData = taosArrayGet(pMnodeList, 0);
20,100✔
756
  if (NULL == pData) {
20,100✔
757
    taosArrayDestroy(nodeList);
×
758
    return TSDB_CODE_OUT_OF_RANGE;
×
759
  }
760
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
20,100✔
761
    taosArrayDestroy(nodeList);
×
762
    return terrno;
×
763
  }
764

765
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
20,100✔
766

767
_return:
×
768

769
  *pNodeList = nodeList;
1,477,186✔
770

771
  return TSDB_CODE_SUCCESS;
1,477,186✔
772
}
773

774
void freeVgList(void* list) {
13,034,127✔
775
  SArray* pList = *(SArray**)list;
13,034,127✔
776
  taosArrayDestroy(pList);
13,041,011✔
777
}
13,027,469✔
778

779
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
133,928,815✔
780
  SArray* pDbVgList = NULL;
133,928,815✔
781
  SArray* pQnodeList = NULL;
133,928,815✔
782
  FDelete fp = NULL;
133,928,815✔
783
  int32_t code = 0;
133,928,815✔
784

785
  switch (tsQueryPolicy) {
133,928,815✔
786
    case QUERY_POLICY_VNODE:
132,451,395✔
787
    case QUERY_POLICY_CLIENT: {
788
      if (pResultMeta) {
132,451,395✔
789
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
132,451,454✔
790
        if (NULL == pDbVgList) {
132,451,545✔
791
          code = terrno;
×
792
          goto _return;
×
793
        }
794
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
132,451,545✔
795
        for (int32_t i = 0; i < dbNum; ++i) {
262,881,584✔
796
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
130,429,928✔
797
          if (pRes->code || NULL == pRes->pRes) {
130,429,990✔
798
            continue;
495✔
799
          }
800

801
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
260,859,030✔
802
            code = terrno;
×
803
            goto _return;
×
804
          }
805
        }
806
      } else {
807
        fp = freeVgList;
×
808

809
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
810
        if (dbNum > 0) {
×
811
          SCatalog*     pCtg = NULL;
×
812
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
813
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
814
          if (code != TSDB_CODE_SUCCESS) {
×
815
            goto _return;
×
816
          }
817

818
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
819
          if (NULL == pDbVgList) {
×
820
            code = terrno;
×
821
            goto _return;
×
822
          }
823
          SArray* pVgList = NULL;
×
824
          for (int32_t i = 0; i < dbNum; ++i) {
×
825
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
826
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
827
                                     .requestId = pRequest->requestId,
×
828
                                     .requestObjRefId = pRequest->self,
×
829
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
830

831
            // catalogGetDBVgList will handle dbFName == null.
832
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
833
            if (code) {
×
834
              goto _return;
×
835
            }
836

837
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
838
              code = terrno;
×
839
              goto _return;
×
840
            }
841
          }
842
        }
843
      }
844

845
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
132,451,656✔
846
      break;
132,451,704✔
847
    }
848
    case QUERY_POLICY_HYBRID:
1,477,186✔
849
    case QUERY_POLICY_QNODE: {
850
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
1,514,156✔
851
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
36,970✔
852
        if (pRes->code) {
36,970✔
853
          pQnodeList = NULL;
×
854
        } else {
855
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
36,970✔
856
          if (NULL == pQnodeList) {
36,970✔
857
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
858
            goto _return;
×
859
          }
860
        }
861
      } else {
862
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,440,216✔
863
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
1,440,216✔
864
        if (pInst->pQnodeList) {
1,440,216✔
865
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
1,440,216✔
866
          if (NULL == pQnodeList) {
1,440,216✔
867
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
868
            goto _return;
×
869
          }
870
        }
871
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
1,440,216✔
872
      }
873

874
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
1,477,186✔
875
      break;
1,477,186✔
876
    }
877
    default:
234✔
878
      tscError("unknown query policy: %d", tsQueryPolicy);
234✔
879
      return TSDB_CODE_APP_ERROR;
×
880
  }
881

882
_return:
133,928,890✔
883
  taosArrayDestroyEx(pDbVgList, fp);
133,928,890✔
884
  taosArrayDestroy(pQnodeList);
133,928,811✔
885

886
  return code;
133,928,817✔
887
}
888

889
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
13,086,678✔
890
  SArray* pDbVgList = NULL;
13,086,678✔
891
  SArray* pQnodeList = NULL;
13,086,678✔
892
  int32_t code = 0;
13,090,388✔
893

894
  switch (tsQueryPolicy) {
13,090,388✔
895
    case QUERY_POLICY_VNODE:
13,077,297✔
896
    case QUERY_POLICY_CLIENT: {
897
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
13,077,297✔
898
      if (dbNum > 0) {
13,103,295✔
899
        SCatalog*     pCtg = NULL;
13,039,904✔
900
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
13,034,419✔
901
        code = catalogGetHandle(pInst->clusterId, &pCtg);
13,039,695✔
902
        if (code != TSDB_CODE_SUCCESS) {
13,021,076✔
903
          goto _return;
×
904
        }
905

906
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
13,021,076✔
907
        if (NULL == pDbVgList) {
13,038,245✔
908
          code = terrno;
×
909
          goto _return;
×
910
        }
911
        SArray* pVgList = NULL;
13,038,257✔
912
        for (int32_t i = 0; i < dbNum; ++i) {
26,068,652✔
913
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
13,031,842✔
914
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
13,036,204✔
915
                                   .requestId = pRequest->requestId,
13,027,795✔
916
                                   .requestObjRefId = pRequest->self,
13,031,846✔
917
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
13,028,524✔
918

919
          // catalogGetDBVgList will handle dbFName == null.
920
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
13,050,170✔
921
          if (code) {
13,047,843✔
922
            goto _return;
×
923
          }
924

925
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
13,048,433✔
926
            code = terrno;
×
927
            goto _return;
×
928
          }
929
        }
930
      }
931

932
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
13,103,940✔
933
      break;
13,100,699✔
934
    }
935
    case QUERY_POLICY_HYBRID:
×
936
    case QUERY_POLICY_QNODE: {
937
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
938

939
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
940
      break;
×
941
    }
942
    default:
13,091✔
943
      tscError("unknown query policy: %d", tsQueryPolicy);
13,091✔
944
      return TSDB_CODE_APP_ERROR;
×
945
  }
946

947
_return:
13,104,822✔
948

949
  taosArrayDestroyEx(pDbVgList, freeVgList);
13,099,829✔
950
  taosArrayDestroy(pQnodeList);
13,089,938✔
951

952
  return code;
13,099,929✔
953
}
954

955
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
13,093,468✔
956
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
13,093,468✔
957

958
  SExecResult      res = {0};
13,108,874✔
959
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
13,107,623✔
960
                           .requestId = pRequest->requestId,
13,106,163✔
961
                           .requestObjRefId = pRequest->self};
13,094,501✔
962
  SSchedulerReq    req = {
13,417,641✔
963
         .syncReq = true,
964
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
13,108,447✔
965
         .pConn = &conn,
966
         .pNodeList = pNodeList,
967
         .pDag = pDag,
968
         .sql = pRequest->sqlstr,
13,108,447✔
969
         .startTs = pRequest->metric.start,
13,093,971✔
970
         .execFp = NULL,
971
         .cbParam = NULL,
972
         .chkKillFp = chkRequestKilled,
973
         .chkKillParam = (void*)pRequest->self,
13,098,488✔
974
         .pExecRes = &res,
975
         .source = pRequest->source,
13,099,382✔
976
         .pWorkerCb = getTaskPoolWorkerCb(),
13,079,467✔
977
  };
978

979
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
13,089,140✔
980

981
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
13,117,973✔
982
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
13,118,718✔
983

984
  if (code != TSDB_CODE_SUCCESS) {
13,116,481✔
985
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
986

987
    pRequest->code = code;
×
988
    terrno = code;
×
989
    return pRequest->code;
×
990
  }
991

992
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
13,116,481✔
993
      TDMT_VND_CREATE_TABLE == pRequest->type) {
50,520✔
994
    pRequest->body.resInfo.numOfRows = res.numOfRows;
13,102,684✔
995
    if (TDMT_VND_SUBMIT == pRequest->type) {
13,103,002✔
996
      STscObj*            pTscObj = pRequest->pTscObj;
13,064,793✔
997
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
13,065,708✔
998
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
13,068,169✔
999
    }
1000

1001
    schedulerFreeJob(&pRequest->body.queryJob, 0);
13,103,170✔
1002
  }
1003

1004
  pRequest->code = res.code;
13,116,838✔
1005
  terrno = res.code;
13,118,216✔
1006
  return pRequest->code;
13,115,244✔
1007
}
1008

1009
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
551,169,155✔
1010
  SArray*      pArray = NULL;
551,169,155✔
1011
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
551,169,155✔
1012
  if (NULL == pRsp->aCreateTbRsp) {
551,169,155✔
1013
    return TSDB_CODE_SUCCESS;
534,213,215✔
1014
  }
1015

1016
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
16,964,047✔
1017
  for (int32_t i = 0; i < tbNum; ++i) {
36,744,413✔
1018
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
19,780,921✔
1019
    if (pTbRsp->pMeta) {
19,778,394✔
1020
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
19,523,442✔
1021
    }
1022
  }
1023

1024
  return TSDB_CODE_SUCCESS;
16,963,492✔
1025
}
1026

1027
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
98,745,689✔
1028
  int32_t code = 0;
98,745,689✔
1029
  SArray* pArray = NULL;
98,745,689✔
1030
  SArray* pTbArray = (SArray*)res;
98,745,689✔
1031
  int32_t tbNum = taosArrayGetSize(pTbArray);
98,745,689✔
1032
  if (tbNum <= 0) {
98,745,519✔
1033
    return TSDB_CODE_SUCCESS;
×
1034
  }
1035

1036
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
98,745,519✔
1037
  if (NULL == pArray) {
98,745,924✔
UNCOV
1038
    return terrno;
×
1039
  }
1040

1041
  for (int32_t i = 0; i < tbNum; ++i) {
307,919,505✔
1042
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
209,173,633✔
1043
    if (NULL == tbInfo) {
209,173,998✔
1044
      code = terrno;
×
1045
      goto _return;
×
1046
    }
1047
    STbSVersion tbSver = {
209,173,998✔
1048
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
209,173,133✔
1049
    if (NULL == taosArrayPush(pArray, &tbSver)) {
209,173,958✔
1050
      code = terrno;
×
1051
      goto _return;
×
1052
    }
1053
  }
1054

1055
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
98,745,872✔
1056
                           .requestId = pRequest->requestId,
98,745,205✔
1057
                           .requestObjRefId = pRequest->self,
98,745,536✔
1058
                           .mgmtEps = *epset};
1059

1060
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
98,745,165✔
1061

1062
_return:
98,746,383✔
1063

1064
  taosArrayDestroy(pArray);
98,745,974✔
1065
  return code;
98,746,015✔
1066
}
1067

1068
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
7,951,501✔
1069
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
7,951,501✔
1070
}
1071

1072
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
63,094,743✔
1073
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
63,094,743✔
1074
}
1075

1076
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
735,911,267✔
1077
  if (NULL == pRequest->body.resInfo.execRes.res) {
735,911,267✔
1078
    return pRequest->code;
46,335,154✔
1079
  }
1080

1081
  SCatalog*     pCatalog = NULL;
689,570,574✔
1082
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
689,574,274✔
1083

1084
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
689,577,817✔
1085
  if (code) {
689,571,309✔
1086
    return code;
×
1087
  }
1088

1089
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
689,571,309✔
1090
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
689,578,172✔
1091

1092
  switch (pRes->msgType) {
689,579,429✔
1093
    case TDMT_VND_ALTER_TABLE:
3,557,322✔
1094
    case TDMT_MND_ALTER_STB: {
1095
      code = handleAlterTbExecRes(pRes->res, pCatalog);
3,557,322✔
1096
      break;
3,557,322✔
1097
    }
1098
    case TDMT_VND_CREATE_TABLE: {
35,736,178✔
1099
      SArray* pList = (SArray*)pRes->res;
35,736,178✔
1100
      int32_t num = taosArrayGetSize(pList);
35,737,387✔
1101
      for (int32_t i = 0; i < num; ++i) {
77,658,528✔
1102
        void* res = taosArrayGetP(pList, i);
41,919,666✔
1103
        // handleCreateTbExecRes will handle res == null
1104
        code = handleCreateTbExecRes(res, pCatalog);
41,920,724✔
1105
      }
1106
      break;
35,738,862✔
1107
    }
1108
    case TDMT_MND_CREATE_STB: {
363,947✔
1109
      code = handleCreateTbExecRes(pRes->res, pCatalog);
363,947✔
1110
      break;
363,947✔
1111
    }
1112
    case TDMT_VND_SUBMIT: {
551,167,090✔
1113
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
551,167,090✔
1114

1115
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
551,176,877✔
1116
      break;
551,174,734✔
1117
    }
1118
    case TDMT_SCH_QUERY:
98,745,689✔
1119
    case TDMT_SCH_MERGE_QUERY: {
1120
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
98,745,689✔
1121
      break;
98,747,081✔
1122
    }
1123
    default:
61✔
1124
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
61✔
1125
               pRequest->type, pRequest->requestId);
1126
      code = TSDB_CODE_APP_ERROR;
×
1127
  }
1128

1129
  return code;
689,581,946✔
1130
}
1131

1132
static bool incompletaFileParsing(SNode* pStmt) {
716,464,169✔
1133
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
716,464,169✔
1134
}
1135

1136
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1137
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1138

1139
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1140
  if (TSDB_CODE_SUCCESS == code) {
×
1141
    int64_t analyseStart = taosGetTimestampUs();
×
1142
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1143
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1144
  }
1145

1146
  if (TSDB_CODE_SUCCESS == code) {
×
1147
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1148
  }
1149

1150
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1151
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1152
}
×
1153

1154
void returnToUser(SRequestObj* pRequest) {
43,857,231✔
1155
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
43,857,231✔
1156
    // return to client
1157
    doRequestCallback(pRequest, pRequest->code);
43,857,231✔
1158
    return;
43,857,231✔
1159
  }
1160

1161
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1162
  if (pUserReq) {
×
1163
    pUserReq->code = pRequest->code;
×
1164
    // return to client
1165
    doRequestCallback(pUserReq, pUserReq->code);
×
1166
    (void)releaseRequest(pRequest->relation.userRefId);
×
1167
    return;
×
1168
  } else {
1169
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1170
             pRequest->relation.userRefId, pRequest->requestId);
1171
  }
1172
}
1173

1174
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1175
  int64_t     lastTs = 0;
×
1176
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1177
  int32_t     numOfFields = taos_num_fields(pRes);
×
1178

1179
  int32_t code = createDataBlock(pBlock);
×
1180
  if (code) {
×
1181
    return code;
×
1182
  }
1183

1184
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1185
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1186
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1187
    if (TSDB_CODE_SUCCESS != code) {
×
1188
      blockDataDestroy(*pBlock);
×
1189
      return code;
×
1190
    }
1191
  }
1192

1193
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1194
  if (TSDB_CODE_SUCCESS != code) {
×
1195
    blockDataDestroy(*pBlock);
×
1196
    return code;
×
1197
  }
1198

1199
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1200
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1201
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1202
      tscError("invalid data from vnode");
×
1203
      blockDataDestroy(*pBlock);
×
1204
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1205
    }
1206
    int64_t ts = *(int64_t*)pRow[0];
×
1207
    if (lastTs < ts) {
×
1208
      lastTs = ts;
×
1209
    }
1210

1211
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1212
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1213
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1214
      if (TSDB_CODE_SUCCESS != code) {
×
1215
        blockDataDestroy(*pBlock);
×
1216
        return code;
×
1217
      }
1218
    }
1219

1220
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1221
            *(int64_t*)pRow[2]);
1222
  }
1223

1224
  (*pBlock)->info.window.ekey = lastTs;
×
1225
  (*pBlock)->info.rows = numOfRows;
×
1226

1227
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1228
  return TSDB_CODE_SUCCESS;
×
1229
}
1230

1231
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1232
  SRequestObj* pRequest = (SRequestObj*)res;
×
1233
  if (pRequest->code) {
×
1234
    returnToUser(pRequest);
×
1235
    return;
×
1236
  }
1237

1238
  SSDataBlock* pBlock = NULL;
×
1239
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1240
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1241
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1242
             tstrerror(pRequest->code));
1243
    returnToUser(pRequest);
×
1244
    return;
×
1245
  }
1246

1247
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1248
  if (pNextReq) {
×
1249
    continuePostSubQuery(pNextReq, pBlock);
×
1250
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1251
  } else {
1252
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1253
             pRequest->relation.nextRefId, pRequest->requestId);
1254
  }
1255

1256
  blockDataDestroy(pBlock);
×
1257
}
1258

1259
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1260
  SRequestObj* pRequest = pWrapper->pRequest;
×
1261
  if (TD_RES_QUERY(pRequest)) {
×
1262
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1263
    return;
×
1264
  }
1265

1266
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1267
  if (pNextReq) {
×
1268
    continuePostSubQuery(pNextReq, NULL);
×
1269
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1270
  } else {
1271
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1272
             pRequest->relation.nextRefId, pRequest->requestId);
1273
  }
1274
}
1275

1276
// todo refacto the error code  mgmt
1277
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
722,487,106✔
1278
  SSqlCallbackWrapper* pWrapper = param;
722,487,106✔
1279
  SRequestObj*         pRequest = pWrapper->pRequest;
722,487,106✔
1280
  STscObj*             pTscObj = pRequest->pTscObj;
722,492,647✔
1281

1282
  pRequest->code = code;
722,489,904✔
1283
  if (pResult) {
722,489,591✔
1284
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
722,443,404✔
1285
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
722,446,645✔
1286
  }
1287

1288
  int32_t type = pRequest->type;
722,492,268✔
1289
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
722,483,898✔
1290
    if (pResult) {
577,204,954✔
1291
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
577,174,831✔
1292

1293
      // record the insert rows
1294
      if (TDMT_VND_SUBMIT == type) {
577,177,039✔
1295
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
538,445,203✔
1296
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
538,449,744✔
1297
      }
1298
    }
1299
    schedulerFreeJob(&pRequest->body.queryJob, 0);
577,211,977✔
1300
  }
1301

1302
  taosMemoryFree(pResult);
722,490,242✔
1303
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
722,486,282✔
1304
           pRequest->requestId);
1305

1306
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
722,489,280✔
1307
      pRequest->stmtBindVersion == 0) {
77,057✔
1308
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
77,057✔
1309
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1310
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
77,057✔
1311
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1312
    }
1313
    restartAsyncQuery(pRequest, code);
77,057✔
1314
    return;
77,057✔
1315
  }
1316

1317
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
722,412,223✔
1318
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
722,412,223✔
1319
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
2,969,665✔
1320
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1321
    }
1322
  }
1323

1324
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
722,415,427✔
1325
  int32_t code1 = handleQueryExecRsp(pRequest);
722,410,990✔
1326
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
722,414,347✔
1327
    pRequest->code = code1;
×
1328
  }
1329

1330
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,438,879,985✔
1331
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
716,463,668✔
1332
    continueInsertFromCsv(pWrapper, pRequest);
11,422✔
1333
    return;
11,422✔
1334
  }
1335

1336
  if (pRequest->relation.nextRefId) {
722,403,799✔
1337
    handlePostSubQuery(pWrapper);
×
1338
  } else {
1339
    destorySqlCallbackWrapper(pWrapper);
722,405,926✔
1340
    pRequest->pWrapper = NULL;
722,389,313✔
1341

1342
    // return to client
1343
    doRequestCallback(pRequest, code);
722,393,795✔
1344
  }
1345
}
1346

1347
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
13,477,758✔
1348
  int32_t code = 0;
13,477,758✔
1349
  int32_t subplanNum = 0;
13,477,758✔
1350

1351
  if (pQuery->pRoot) {
13,477,758✔
1352
    pRequest->stmtType = pQuery->pRoot->type;
13,110,665✔
1353
  }
1354

1355
  if (pQuery->pRoot && !pRequest->inRetry) {
13,463,838✔
1356
    STscObj*            pTscObj = pRequest->pTscObj;
13,114,147✔
1357
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
13,105,043✔
1358
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
13,114,501✔
1359
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
13,099,964✔
1360
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
9,017✔
1361
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
9,016✔
1362
    }
1363
  }
1364

1365
  pRequest->body.execMode = pQuery->execMode;
13,491,046✔
1366
  switch (pQuery->execMode) {
13,501,734✔
1367
    case QUERY_EXEC_MODE_LOCAL:
×
1368
      if (!pRequest->validateOnly) {
×
1369
        if (NULL == pQuery->pRoot) {
×
1370
          terrno = TSDB_CODE_INVALID_PARA;
×
1371
          code = terrno;
×
1372
        } else {
1373
          code = execLocalCmd(pRequest, pQuery);
×
1374
        }
1375
      }
1376
      break;
×
1377
    case QUERY_EXEC_MODE_RPC:
385,867✔
1378
      if (!pRequest->validateOnly) {
385,867✔
1379
        code = execDdlQuery(pRequest, pQuery);
385,867✔
1380
      }
1381
      break;
385,867✔
1382
    case QUERY_EXEC_MODE_SCHEDULE: {
13,098,526✔
1383
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
13,098,526✔
1384
      if (NULL == pMnodeList) {
13,090,121✔
1385
        code = terrno;
×
1386
        break;
×
1387
      }
1388
      SQueryPlan* pDag = NULL;
13,090,121✔
1389
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
13,090,486✔
1390
      if (TSDB_CODE_SUCCESS == code) {
13,090,300✔
1391
        pRequest->body.subplanNum = pDag->numOfSubplans;
13,088,865✔
1392
        if (!pRequest->validateOnly) {
13,103,931✔
1393
          SArray* pNodeList = NULL;
13,091,685✔
1394
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
13,090,892✔
1395

1396
          if (TSDB_CODE_SUCCESS == code) {
13,092,117✔
1397
            code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
13,111,852✔
1398
                                        taosArrayGetSize(pNodeList));
13,097,427✔
1399
          }
1400

1401
          if (TSDB_CODE_SUCCESS == code) {
13,101,904✔
1402
            code = scheduleQuery(pRequest, pDag, pNodeList);
13,101,904✔
1403
          }
1404
          taosArrayDestroy(pNodeList);
13,113,468✔
1405
        }
1406
      }
1407
      taosArrayDestroy(pMnodeList);
13,111,577✔
1408
      break;
13,117,423✔
1409
    }
1410
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1411
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1412
      break;
×
1413
    default:
×
1414
      break;
×
1415
  }
1416

1417
  if (!keepQuery) {
13,503,912✔
1418
    qDestroyQuery(pQuery);
×
1419
  }
1420

1421
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
13,503,912✔
1422
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
22,763✔
1423
    if (TSDB_CODE_SUCCESS != ret) {
22,763✔
1424
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1425
               pRequest->requestId);
1426
    }
1427
  }
1428

1429
  if (TSDB_CODE_SUCCESS == code) {
13,500,847✔
1430
    code = handleQueryExecRsp(pRequest);
13,499,938✔
1431
  }
1432

1433
  if (TSDB_CODE_SUCCESS != code) {
13,503,113✔
1434
    pRequest->code = code;
5,551✔
1435
  }
1436

1437
  if (res) {
13,503,113✔
1438
    *res = pRequest->body.resInfo.execRes.res;
×
1439
    pRequest->body.resInfo.execRes.res = NULL;
×
1440
  }
1441
}
13,503,113✔
1442

1443
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
722,719,555✔
1444
                                 SSqlCallbackWrapper* pWrapper) {
1445
  int32_t code = TSDB_CODE_SUCCESS;
722,719,555✔
1446
  pRequest->type = pQuery->msgType;
722,719,555✔
1447
  SArray*     pMnodeList = NULL;
722,721,699✔
1448
  SArray*     pNodeList = NULL;
722,721,699✔
1449
  SQueryPlan* pDag = NULL;
722,708,375✔
1450
  int64_t     st = taosGetTimestampUs();
722,703,365✔
1451

1452
  if (!pRequest->parseOnly) {
722,703,365✔
1453
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
722,727,166✔
1454
    if (NULL == pMnodeList) {
722,705,258✔
1455
      code = terrno;
×
1456
    }
1457
    SPlanContext cxt = {.queryId = pRequest->requestId,
777,750,290✔
1458
                        .acctId = pRequest->pTscObj->acctId,
722,730,883✔
1459
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
722,738,031✔
1460
                        .pAstRoot = pQuery->pRoot,
722,735,327✔
1461
                        .showRewrite = pQuery->showRewrite,
722,735,925✔
1462
                        .isView = pWrapper->pParseCtx->isView,
722,733,528✔
1463
                        .isAudit = pWrapper->pParseCtx->isAudit,
722,729,266✔
1464
                        .pMsg = pRequest->msgBuf,
722,729,006✔
1465
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1466
                        .pUser = pRequest->pTscObj->user,
722,723,700✔
1467
                        .userId = pRequest->pTscObj->userId,
722,738,376✔
1468
                        .sysInfo = pRequest->pTscObj->sysInfo,
722,733,763✔
1469
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
722,727,650✔
1470
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
722,728,776✔
1471
    if (TSDB_CODE_SUCCESS == code) {
722,732,772✔
1472
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
722,729,329✔
1473
    }
1474
    if (code) {
722,724,139✔
1475
      tscError("req:0x%" PRIx64 " requestId:0x%" PRIx64 ", failed to create query plan, code:%s msg:%s", pRequest->self,
223,139✔
1476
               pRequest->requestId, tstrerror(code), cxt.pMsg);
1477
    } else {
1478
      pRequest->body.subplanNum = pDag->numOfSubplans;
722,501,000✔
1479
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
722,500,454✔
1480
    }
1481
  }
1482

1483
  pRequest->metric.execStart = taosGetTimestampUs();
722,717,612✔
1484
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
722,719,760✔
1485

1486
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
722,718,704✔
1487
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
722,471,122✔
1488
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
133,928,738✔
1489
    }
1490

1491
    if (code == TSDB_CODE_SUCCESS) {
722,481,241✔
1492
      code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
722,477,443✔
1493
                                  taosArrayGetSize(pNodeList));
722,479,123✔
1494
    }
1495

1496
    if (code == TSDB_CODE_SUCCESS) {
722,478,180✔
1497
      SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
722,475,580✔
1498
                               .requestId = pRequest->requestId,
722,488,410✔
1499
                               .requestObjRefId = pRequest->self};
722,493,333✔
1500
      SSchedulerReq    req = {
750,003,851✔
1501
             .syncReq = false,
1502
             .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
722,473,074✔
1503
             .pConn = &conn,
1504
             .pNodeList = pNodeList,
1505
             .pDag = pDag,
1506
             .allocatorRefId = pRequest->allocatorRefId,
722,473,074✔
1507
             .sql = pRequest->sqlstr,
722,463,501✔
1508
             .startTs = pRequest->metric.start,
722,472,064✔
1509
             .execFp = schedulerExecCb,
1510
             .cbParam = pWrapper,
1511
             .chkKillFp = chkRequestKilled,
1512
             .chkKillParam = (void*)pRequest->self,
722,472,080✔
1513
             .pExecRes = NULL,
1514
             .source = pRequest->source,
722,462,965✔
1515
             .pWorkerCb = getTaskPoolWorkerCb(),
722,470,866✔
1516
      };
1517

1518
      if (TSDB_CODE_SUCCESS == code) {
722,475,291✔
1519
        code = schedulerExecJob(&req, &pRequest->body.queryJob);
722,489,759✔
1520
      }
1521
      taosArrayDestroy(pNodeList);
722,476,312✔
1522
      taosArrayDestroy(pMnodeList);
722,488,798✔
1523
      return code;
722,487,483✔
1524
    }
1525
  }
1526

1527
  qDestroyQueryPlan(pDag);
257,728✔
1528
  tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
246,148✔
1529
           pRequest->requestId);
1530
  destorySqlCallbackWrapper(pWrapper);
246,148✔
1531
  pRequest->pWrapper = NULL;
246,148✔
1532
  if (TSDB_CODE_SUCCESS != code) {
246,148✔
1533
    pRequest->code = code;
225,739✔
1534
  }
1535

1536
  doRequestCallback(pRequest, code);
246,148✔
1537

1538
  // todo not to be released here
1539
  taosArrayDestroy(pMnodeList);
246,148✔
1540
  taosArrayDestroy(pNodeList);
246,148✔
1541

1542
  return code;
246,148✔
1543
}
1544

1545
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
742,764,596✔
1546
  int32_t code = 0;
742,764,596✔
1547

1548
  if (pRequest->parseOnly) {
742,764,596✔
1549
    doRequestCallback(pRequest, 0);
65,533✔
1550
    return;
65,533✔
1551
  }
1552

1553
  pRequest->body.execMode = pQuery->execMode;
742,706,202✔
1554
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
742,697,863✔
1555
    destorySqlCallbackWrapper(pWrapper);
19,975,863✔
1556
    pRequest->pWrapper = NULL;
19,975,863✔
1557
  }
1558

1559
  if (pQuery->pRoot && !pRequest->inRetry) {
742,687,735✔
1560
    STscObj*            pTscObj = pRequest->pTscObj;
742,700,651✔
1561
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
742,695,534✔
1562
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
742,701,911✔
1563
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
588,556,775✔
1564
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
538,244,258✔
1565
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
204,458,811✔
1566
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
99,509,529✔
1567
    }
1568
  }
1569

1570
  switch (pQuery->execMode) {
742,704,369✔
1571
    case QUERY_EXEC_MODE_LOCAL:
5,127,095✔
1572
      asyncExecLocalCmd(pRequest, pQuery);
5,127,095✔
1573
      break;
5,127,095✔
1574
    case QUERY_EXEC_MODE_RPC:
14,332,727✔
1575
      code = asyncExecDdlQuery(pRequest, pQuery);
14,332,727✔
1576
      break;
14,332,727✔
1577
    case QUERY_EXEC_MODE_SCHEDULE: {
722,731,358✔
1578
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
722,731,358✔
1579
      break;
722,734,778✔
1580
    }
1581
    case QUERY_EXEC_MODE_EMPTY_RESULT:
516,155✔
1582
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
516,155✔
1583
      doRequestCallback(pRequest, 0);
516,155✔
1584
      break;
516,155✔
1585
    default:
×
1586
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1587
      doRequestCallback(pRequest, -1);
×
1588
      break;
×
1589
  }
1590
}
1591

1592
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
11,213✔
1593
  SCatalog* pCatalog = NULL;
11,213✔
1594
  int32_t   code = 0;
11,213✔
1595
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
11,213✔
1596
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
11,213✔
1597

1598
  if (dbNum <= 0 && tblNum <= 0) {
11,213✔
1599
    return TSDB_CODE_APP_ERROR;
11,177✔
1600
  }
1601

1602
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
36✔
1603
  if (code != TSDB_CODE_SUCCESS) {
36✔
1604
    return code;
×
1605
  }
1606

1607
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
36✔
1608
                           .requestId = pRequest->requestId,
36✔
1609
                           .requestObjRefId = pRequest->self,
36✔
1610
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
36✔
1611

1612
  for (int32_t i = 0; i < dbNum; ++i) {
72✔
1613
    char* dbFName = taosArrayGet(pRequest->dbList, i);
36✔
1614

1615
    // catalogRefreshDBVgInfo will handle dbFName == null.
1616
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
36✔
1617
    if (code != TSDB_CODE_SUCCESS) {
36✔
1618
      return code;
×
1619
    }
1620
  }
1621

1622
  for (int32_t i = 0; i < tblNum; ++i) {
72✔
1623
    SName* tableName = taosArrayGet(pRequest->tableList, i);
36✔
1624

1625
    // catalogRefreshTableMeta will handle tableName == null.
1626
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
36✔
1627
    if (code != TSDB_CODE_SUCCESS) {
36✔
1628
      return code;
×
1629
    }
1630
  }
1631

1632
  return code;
36✔
1633
}
1634

1635
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
3,910,266✔
1636
  SCatalog* pCatalog = NULL;
3,910,266✔
1637
  int32_t   tbNum = taosArrayGetSize(tbList);
3,910,266✔
1638
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
3,910,266✔
1639
  if (code != TSDB_CODE_SUCCESS) {
3,910,266✔
1640
    return code;
×
1641
  }
1642

1643
  if (isView) {
3,910,266✔
1644
    for (int32_t i = 0; i < tbNum; ++i) {
40,818✔
1645
      SName* pViewName = taosArrayGet(tbList, i);
20,409✔
1646
      char   dbFName[TSDB_DB_FNAME_LEN];
17,659✔
1647
      if (NULL == pViewName) {
20,409✔
1648
        continue;
×
1649
      }
1650
      (void)tNameGetFullDbName(pViewName, dbFName);
20,409✔
1651
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
20,409✔
1652
    }
1653
  } else {
1654
    for (int32_t i = 0; i < tbNum; ++i) {
6,049,155✔
1655
      SName* pTbName = taosArrayGet(tbList, i);
2,159,298✔
1656
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
2,159,298✔
1657
    }
1658
  }
1659

1660
  return TSDB_CODE_SUCCESS;
3,910,266✔
1661
}
1662

1663
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
2,378,775✔
1664
  pEpSet->version = 0;
2,378,775✔
1665

1666
  // init mnode ip set
1667
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
2,379,582✔
1668
  mgmtEpSet->numOfEps = 0;
2,379,538✔
1669
  mgmtEpSet->inUse = 0;
2,379,799✔
1670

1671
  if (firstEp && firstEp[0] != 0) {
2,379,843✔
1672
    if (strlen(firstEp) >= TSDB_EP_LEN) {
2,379,453✔
1673
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1674
      return -1;
×
1675
    }
1676

1677
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,379,453✔
1678
    if (code != TSDB_CODE_SUCCESS) {
2,379,092✔
1679
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1680
      return terrno;
×
1681
    }
1682
    // uint32_t addr = 0;
1683
    SIpAddr addr = {0};
2,379,092✔
1684
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
2,379,411✔
1685
    if (code) {
2,379,529✔
1686
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
130✔
1687
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1688
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
143✔
1689
    } else {
1690
      mgmtEpSet->numOfEps++;
2,379,423✔
1691
    }
1692
  }
1693

1694
  if (secondEp && secondEp[0] != 0) {
2,379,870✔
1695
    if (strlen(secondEp) >= TSDB_EP_LEN) {
1,541,952✔
1696
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1697
      return terrno;
×
1698
    }
1699

1700
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
1,541,952✔
1701
    if (code != TSDB_CODE_SUCCESS) {
1,542,174✔
1702
      return code;
×
1703
    }
1704
    SIpAddr addr = {0};
1,542,174✔
1705
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
1,542,174✔
1706
    if (code) {
1,541,339✔
1707
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
222✔
1708
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1709
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1710
    } else {
1711
      mgmtEpSet->numOfEps++;
1,541,117✔
1712
    }
1713
  }
1714

1715
  if (mgmtEpSet->numOfEps == 0) {
2,379,445✔
1716
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
143✔
1717
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
143✔
1718
  }
1719

1720
  return 0;
2,379,772✔
1721
}
1722

1723
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
2,380,281✔
1724
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1725
  *pTscObj = NULL;
2,380,281✔
1726
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
2,380,281✔
1727
  if (TSDB_CODE_SUCCESS != code) {
2,380,281✔
1728
    return code;
×
1729
  }
1730

1731
  SRequestObj* pRequest = NULL;
2,380,281✔
1732
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
2,380,281✔
1733
  if (TSDB_CODE_SUCCESS != code) {
2,380,271✔
1734
    destroyTscObj(*pTscObj);
×
1735
    return code;
×
1736
  }
1737

1738
  pRequest->sqlstr = taosStrdup("taos_connect");
2,380,271✔
1739
  if (pRequest->sqlstr) {
2,380,195✔
1740
    pRequest->sqlLen = strlen(pRequest->sqlstr);
2,380,195✔
1741
  } else {
1742
    return terrno;
×
1743
  }
1744

1745
  SMsgSendInfo* body = NULL;
2,380,195✔
1746
  code = buildConnectMsg(pRequest, &body, totpCode);
2,380,195✔
1747
  if (TSDB_CODE_SUCCESS != code) {
2,380,054✔
1748
    destroyTscObj(*pTscObj);
×
1749
    return code;
×
1750
  }
1751

1752
  // int64_t transporterId = 0;
1753
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
2,380,054✔
1754
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
2,380,039✔
1755
  if (TSDB_CODE_SUCCESS != code) {
2,380,251✔
1756
    destroyTscObj(*pTscObj);
×
1757
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1758
    return code;
×
1759
  }
1760
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
2,380,251✔
UNCOV
1761
    destroyTscObj(*pTscObj);
×
1762
    tscError("failed to wait sem, code:%s", terrstr());
×
1763
    return terrno;
×
1764
  }
1765
  if (pRequest->code != TSDB_CODE_SUCCESS) {
2,380,281✔
1766
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
11,386✔
1767
    tscError("failed to connect to server, reason: %s", errorMsg);
11,386✔
1768

1769
    terrno = pRequest->code;
11,386✔
1770
    destroyRequest(pRequest);
11,386✔
1771
    taos_close_internal(*pTscObj);
11,386✔
1772
    *pTscObj = NULL;
11,386✔
1773
    return terrno;
11,386✔
1774
  }
1775
  if (connType == CONN_TYPE__AUTH_TEST) {
2,368,895✔
1776
    terrno = TSDB_CODE_SUCCESS;
×
1777
    destroyRequest(pRequest);
×
1778
    taos_close_internal(*pTscObj);
×
1779
    *pTscObj = NULL;
×
1780
    return TSDB_CODE_SUCCESS;
×
1781
  }
1782

1783
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
2,368,895✔
1784
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1785
  destroyRequest(pRequest);
2,368,895✔
1786
  return code;
2,368,895✔
1787
}
1788

1789
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
2,380,185✔
1790
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,380,185✔
1791
  if (*pMsgSendInfo == NULL) {
2,380,234✔
1792
    return terrno;
×
1793
  }
1794

1795
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
2,380,234✔
1796

1797
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
2,380,234✔
1798
  (*pMsgSendInfo)->requestId = pRequest->requestId;
2,380,234✔
1799
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
2,380,204✔
1800
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
2,380,155✔
1801
  if (NULL == (*pMsgSendInfo)->param) {
2,380,220✔
1802
    taosMemoryFree(*pMsgSendInfo);
×
1803
    return terrno;
×
1804
  }
1805

1806
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
2,380,220✔
1807

1808
  SConnectReq connectReq = {0};
2,380,220✔
1809
  STscObj*    pObj = pRequest->pTscObj;
2,380,208✔
1810

1811
  char* db = getDbOfConnection(pObj);
2,380,208✔
1812
  if (db != NULL) {
2,380,269✔
1813
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
497,163✔
1814
  } else if (terrno) {
1,883,106✔
1815
    taosMemoryFree(*pMsgSendInfo);
×
1816
    return terrno;
×
1817
  }
1818
  taosMemoryFreeClear(db);
2,380,269✔
1819

1820
  connectReq.connType = pObj->connType;
2,380,306✔
1821
  connectReq.pid = appInfo.pid;
2,380,306✔
1822
  connectReq.startTime = appInfo.startTime;
2,380,306✔
1823
  connectReq.totpCode = totpCode;
2,380,306✔
1824
  connectReq.connectTime = taosGetTimestampMs();
2,380,017✔
1825

1826
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
2,380,017✔
1827
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
2,380,195✔
1828
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
2,379,987✔
1829
  tstrncpy(connectReq.token, pObj->token, sizeof(connectReq.token));
2,380,161✔
1830
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
2,380,119✔
1831
  tSignConnectReq(&connectReq);
2,380,119✔
1832

1833
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
2,380,232✔
1834
  void*   pReq = taosMemoryMalloc(contLen);
2,379,414✔
1835
  if (NULL == pReq) {
2,380,166✔
1836
    taosMemoryFree(*pMsgSendInfo);
×
1837
    return terrno;
×
1838
  }
1839

1840
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
2,380,166✔
1841
    taosMemoryFree(*pMsgSendInfo);
37✔
1842
    taosMemoryFree(pReq);
×
1843
    return terrno;
×
1844
  }
1845

1846
  (*pMsgSendInfo)->msgInfo.len = contLen;
2,380,087✔
1847
  (*pMsgSendInfo)->msgInfo.pData = pReq;
2,380,087✔
1848
  return TSDB_CODE_SUCCESS;
2,379,980✔
1849
}
1850

1851
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,368,444,987✔
1852
  if (NULL == pEpSet) {
1,368,444,987✔
1853
    return;
1,364,681,513✔
1854
  }
1855

1856
  switch (pSendInfo->target.type) {
3,764,532✔
1857
    case TARGET_TYPE_MNODE:
1,364✔
1858
      if (NULL == pTscObj) {
1,364✔
1859
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1860
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1861
        return;
×
1862
      }
1863

1864
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,364✔
1865
      SEpSet* pOrig = &originEpset;
1,364✔
1866
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
1,364✔
1867
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
1,364✔
1868
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
1,364✔
1869
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1870
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
1,364✔
1871
      break;
409,924✔
1872
    case TARGET_TYPE_VNODE: {
3,567,003✔
1873
      if (NULL == pTscObj) {
3,567,003✔
1874
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1875
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1876
        return;
×
1877
      }
1878

1879
      SCatalog* pCatalog = NULL;
3,567,003✔
1880
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
3,567,003✔
1881
      if (code != TSDB_CODE_SUCCESS) {
3,566,990✔
1882
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1883
                 tstrerror(code));
1884
        return;
×
1885
      }
1886

1887
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
3,566,990✔
1888
      if (code != TSDB_CODE_SUCCESS) {
3,567,008✔
UNCOV
1889
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1890
                 tstrerror(code));
1891
        return;
×
1892
      }
1893
      taosMemoryFreeClear(pSendInfo->target.dbFName);
3,567,008✔
1894
      break;
3,567,008✔
1895
    }
1896
    default:
197,555✔
1897
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
197,555✔
1898
      break;
197,787✔
1899
  }
1900
}
1901

1902
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
1,368,580,077✔
1903
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
1,368,580,077✔
1904
  if (pMsg->info.ahandle == NULL) {
1,368,577,924✔
1905
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
96,812✔
1906
    rpcFreeCont(pMsg->pCont);
96,812✔
1907
    taosMemoryFree(pEpSet);
96,812✔
1908
    return TSDB_CODE_TSC_INTERNAL_ERROR;
96,812✔
1909
  }
1910

1911
  STscObj* pTscObj = NULL;
1,368,481,916✔
1912

1913
  STraceId* trace = &pMsg->info.traceId;
1,368,481,916✔
1914
  char      tbuf[40] = {0};
1,368,482,972✔
1915
  TRACE_TO_STR(trace, tbuf);
1,368,482,571✔
1916

1917
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
1,368,480,010✔
1918
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1919

1920
  if (pSendInfo->requestObjRefId != 0) {
1,368,479,525✔
1921
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,153,750,938✔
1922
    if (pRequest) {
1,153,752,099✔
1923
      if (pRequest->self != pSendInfo->requestObjRefId) {
1,149,963,737✔
1924
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1925
                 pSendInfo->requestObjRefId);
1926

1927
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1928
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1929
        }
1930
        rpcFreeCont(pMsg->pCont);
×
1931
        taosMemoryFree(pEpSet);
×
1932
        destroySendMsgInfo(pSendInfo);
×
1933
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1934
      }
1935
      pTscObj = pRequest->pTscObj;
1,149,964,482✔
1936
    }
1937
  }
1938

1939
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
1,368,482,154✔
1940

1941
  SDataBuf buf = {.msgType = pMsg->msgType,
1,368,445,623✔
1942
                  .len = pMsg->contLen,
1,368,447,904✔
1943
                  .pData = NULL,
1944
                  .handle = pMsg->info.handle,
1,368,449,211✔
1945
                  .handleRefId = pMsg->info.refId,
1,368,449,859✔
1946
                  .pEpSet = pEpSet};
1947

1948
  if (pMsg->contLen > 0) {
1,368,450,561✔
1949
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
1,341,586,182✔
1950
    if (buf.pData == NULL) {
1,341,575,339✔
1951
      pMsg->code = terrno;
×
1952
    } else {
1953
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
1,341,575,339✔
1954
    }
1955
  }
1956

1957
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
1,368,448,893✔
1958

1959
  if (pTscObj) {
1,368,459,645✔
1960
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,149,952,323✔
1961
    if (TSDB_CODE_SUCCESS != code) {
1,149,970,628✔
1962
      tscError("doProcessMsgFromServer taosReleaseRef failed");
626✔
1963
      terrno = code;
626✔
1964
      pMsg->code = code;
626✔
1965
    }
1966
  }
1967

1968
  rpcFreeCont(pMsg->pCont);
1,368,477,950✔
1969
  destroySendMsgInfo(pSendInfo);
1,368,453,323✔
1970
  return TSDB_CODE_SUCCESS;
1,368,446,047✔
1971
}
1972

1973
int32_t doProcessMsgFromServer(void* param) {
1,368,581,804✔
1974
  AsyncArg* arg = (AsyncArg*)param;
1,368,581,804✔
1975
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
1,368,581,804✔
1976
  taosMemoryFree(arg);
1,368,538,789✔
1977
  return code;
1,368,547,144✔
1978
}
1979

1980
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,368,516,064✔
1981
  int32_t code = 0;
1,368,516,064✔
1982
  SEpSet* tEpSet = NULL;
1,368,516,064✔
1983

1984
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
1,368,516,064✔
1985

1986
  if (pEpSet != NULL) {
1,368,513,470✔
1987
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
3,766,113✔
1988
    if (NULL == tEpSet) {
3,766,094✔
1989
      code = terrno;
×
1990
      pMsg->code = terrno;
×
1991
      goto _exit;
×
1992
    }
1993
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
3,766,094✔
1994
  }
1995

1996
  // pMsg is response msg
1997
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
1,368,513,451✔
1998
    // restore origin code
1999
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
2,380,281✔
2000
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
2001
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
2,380,281✔
2002
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
2003
    }
2004
  } else {
2005
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
2006
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
1,366,135,447✔
2007
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
2008
    }
2009
  }
2010

2011
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
1,368,518,580✔
2012
  if (NULL == arg) {
1,368,457,215✔
2013
    code = terrno;
×
2014
    pMsg->code = code;
×
2015
    goto _exit;
×
2016
  }
2017

2018
  arg->msg = *pMsg;
1,368,457,215✔
2019
  arg->pEpset = tEpSet;
1,368,457,483✔
2020

2021
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
1,368,470,134✔
2022
    pMsg->code = code;
6,843✔
2023
    taosMemoryFree(arg);
6,843✔
2024
    goto _exit;
×
2025
  }
2026
  return;
1,368,561,483✔
2027

2028
_exit:
×
2029
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
2030
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
2031
  if (code != 0) {
×
2032
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2033
  }
2034
}
2035

2036
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
1,848✔
2037
                        uint16_t port) {
2038
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
1,848✔
2039
  if (user == NULL) {
1,848✔
2040
    user = TSDB_DEFAULT_USER;
×
2041
  }
2042

2043
  if (pass == NULL) {
1,848✔
2044
    pass = TSDB_DEFAULT_PASS;
×
2045
  }
2046

2047
  STscObj* pObj = NULL;
1,848✔
2048
  int32_t  code = taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__QUERY, &pObj);
1,848✔
2049
  if (TSDB_CODE_SUCCESS == code) {
1,848✔
2050
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,256✔
2051
    if (NULL == rid) {
1,256✔
2052
      tscError("out of memory when taos_connect_totp to %s:%u, user:%s db:%s", ip, port, user, db);
×
2053
      return NULL;
×
2054
    }
2055
    *rid = pObj->id;
1,256✔
2056
    return (TAOS*)rid;
1,256✔
2057
  } else {
2058
    terrno = code;
592✔
2059
  }
2060

2061
  return NULL;
592✔
2062
}
2063

2064
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
×
2065
                      uint16_t port) {
2066
  tscInfo("try to test connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
2067
  if (user == NULL) {
×
2068
    user = TSDB_DEFAULT_USER;
×
2069
  }
2070

2071
  if (pass == NULL) {
×
2072
    pass = TSDB_DEFAULT_PASS;
×
2073
  }
2074

2075
  STscObj* pObj = NULL;
×
2076
  return taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
×
2077
}
2078

2079
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) {
2,217✔
2080
  tscInfo("try to connect to %s:%u by token, db:%s", ip, port, db);
2,217✔
2081

2082
  STscObj* pObj = NULL;
2,217✔
2083
  int32_t  code = taos_connect_by_auth(ip, NULL, token, NULL, db, port, CONN_TYPE__QUERY, &pObj);
2,217✔
2084
  if (TSDB_CODE_SUCCESS == code) {
2,217✔
2085
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,031✔
2086
    if (NULL == rid) {
1,031✔
2087
      tscError("out of memory when taos_connect_token to %s:%u db:%s", ip, port, db);
×
2088
      return NULL;
×
2089
    }
2090
    *rid = pObj->id;
1,031✔
2091
    return (TAOS*)rid;
1,031✔
2092
  } else {
2093
    terrno = code;
1,186✔
2094
  }
2095

2096
  return NULL;
1,186✔
2097
}
2098

2099
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
100✔
2100
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
100✔
2101
  if (user == NULL) {
100✔
2102
    user = TSDB_DEFAULT_USER;
×
2103
  }
2104

2105
  if (auth == NULL) {
100✔
2106
    tscError("No auth info is given, failed to connect to server");
×
2107
    return NULL;
×
2108
  }
2109

2110
  STscObj* pObj = NULL;
100✔
2111
  int32_t  code = taos_connect_by_auth(ip, user, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
100✔
2112
  if (TSDB_CODE_SUCCESS == code) {
100✔
UNCOV
2113
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
×
UNCOV
2114
    if (NULL == rid) {
×
2115
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2116
    }
UNCOV
2117
    *rid = pObj->id;
×
UNCOV
2118
    return (TAOS*)rid;
×
2119
  }
2120

2121
  return NULL;
100✔
2122
}
2123

2124
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
2125
//                      const char* db, int dbLen, uint16_t port) {
2126
//   char ipStr[TSDB_EP_LEN] = {0};
2127
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
2128
//   char userStr[TSDB_USER_LEN] = {0};
2129
//   char passStr[TSDB_PASSWORD_LEN] = {0};
2130
//
2131
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
2132
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
2133
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
2134
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
2135
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
2136
// }
2137

2138
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2139
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2140
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2141

2142
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2143
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2144

2145
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2146
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2147
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2148

2149
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
2150
          pResultInfo->length[i] = blobDataLen(pStart);
813✔
UNCOV
2151
          pResultInfo->row[i] = blobDataVal(pStart);
×
2152
        } else {
2153
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2154
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2155
        }
2156
      } else {
2157
        pResultInfo->row[i] = NULL;
251,511,740✔
2158
        pResultInfo->length[i] = 0;
251,523,470✔
2159
      }
2160
    } else {
2161
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2162
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2163
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2164
      } else {
2165
        pResultInfo->row[i] = NULL;
977,146,167✔
2166
        pResultInfo->length[i] = 0;
977,968,904✔
2167
      }
2168
    }
2169
  }
2170
}
2,147,483,647✔
2171

2172
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2173
  if (pRequest == NULL) {
×
2174
    return NULL;
×
2175
  }
2176

2177
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2178
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2179
    // All data has returned to App already, no need to try again
2180
    if (pResultInfo->completed) {
×
2181
      pResultInfo->numOfRows = 0;
×
2182
      return NULL;
×
2183
    }
2184

2185
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2186
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2187

2188
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2189
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2190
      pResultInfo->numOfRows = 0;
×
2191
      return NULL;
×
2192
    }
2193

2194
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2195
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2196
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2197
      pResultInfo->numOfRows = 0;
×
2198
      return NULL;
×
2199
    }
2200

2201
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2202
             ", complete:%d, QID:0x%" PRIx64,
2203
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2204

2205
    STscObj*            pTscObj = pRequest->pTscObj;
×
2206
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2207
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2208

2209
    if (pResultInfo->numOfRows == 0) {
×
2210
      return NULL;
×
2211
    }
2212
  }
2213

2214
  if (setupOneRowPtr) {
×
2215
    doSetOneRowPtr(pResultInfo);
×
2216
    pResultInfo->current += 1;
×
2217
  }
2218

2219
  return pResultInfo->row;
×
2220
}
2221

2222
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
110,002,074✔
2223
  tsem_t* sem = param;
110,002,074✔
2224
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
110,002,074✔
2225
    tscError("failed to post sem, code:%s", terrstr());
×
2226
  }
2227
}
110,002,074✔
2228

2229
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,860,027,576✔
2230
  if (pRequest == NULL) {
1,860,027,576✔
2231
    return NULL;
×
2232
  }
2233

2234
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,860,027,576✔
2235
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,860,031,051✔
2236
    // All data has returned to App already, no need to try again
2237
    if (pResultInfo->completed) {
195,503,109✔
2238
      pResultInfo->numOfRows = 0;
85,501,393✔
2239
      return NULL;
85,501,393✔
2240
    }
2241

2242
    // convert ucs4 to native multi-bytes string
2243
    pResultInfo->convertUcs4 = convertUcs4;
110,002,056✔
2244
    tsem_t sem;
109,565,003✔
2245
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
110,002,009✔
2246
      tscError("failed to init sem, code:%s", terrstr());
×
2247
    }
2248
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
110,001,968✔
2249
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
110,002,070✔
2250
      tscError("failed to wait sem, code:%s", terrstr());
×
2251
    }
2252
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
110,001,592✔
2253
      tscError("failed to destroy sem, code:%s", terrstr());
×
2254
    }
2255
    pRequest->inCallback = false;
110,002,060✔
2256
  }
2257

2258
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,774,533,407✔
2259
    return NULL;
9,865,464✔
2260
  } else {
2261
    if (setupOneRowPtr) {
1,764,667,994✔
2262
      doSetOneRowPtr(pResultInfo);
1,663,259,772✔
2263
      pResultInfo->current += 1;
1,663,259,873✔
2264
    }
2265

2266
    return pResultInfo->row;
1,764,667,916✔
2267
  }
2268
}
2269

2270
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
233,988,894✔
2271
  if (pResInfo->row == NULL) {
233,988,894✔
2272
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
215,974,639✔
2273
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
215,972,991✔
2274
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
215,964,174✔
2275
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
215,969,849✔
2276

2277
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
215,975,610✔
2278
      taosMemoryFree(pResInfo->row);
7,437✔
2279
      taosMemoryFree(pResInfo->pCol);
×
2280
      taosMemoryFree(pResInfo->length);
×
2281
      taosMemoryFree(pResInfo->convertBuf);
×
2282
      return terrno;
×
2283
    }
2284
  }
2285

2286
  return TSDB_CODE_SUCCESS;
233,993,500✔
2287
}
2288

2289
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
233,853,290✔
2290
  int32_t idx = -1;
233,853,290✔
2291
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
233,854,732✔
2292
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
233,852,968✔
2293

2294
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,326,877,813✔
2295
    int32_t type = pResultInfo->fields[i].type;
1,093,034,883✔
2296
    int32_t schemaBytes =
2297
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,093,040,141✔
2298

2299
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1,093,037,361✔
2300
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
52,908,683✔
2301
      if (p == NULL) {
52,908,683✔
2302
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2303
        return terrno;
×
2304
      }
2305

2306
      pResultInfo->convertBuf[i] = p;
52,908,683✔
2307

2308
      SResultColumn* pCol = &pResultInfo->pCol[i];
52,908,683✔
2309
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2310
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2311
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2312

2313
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2314
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2315
            tscError(
1,400✔
2316
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2317
                "colLength[i]):%p",
2318
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2319
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
1,400✔
2320
            return TSDB_CODE_TSC_INTERNAL_ERROR;
449✔
2321
          }
2322

2323
          varDataSetLen(p, len);
2,147,483,647✔
2324
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2325
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2326
        }
2327
      }
2328

2329
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
52,908,234✔
2330
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
52,908,234✔
2331
    }
2332
  }
2333
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
233,855,295✔
2334
  return TSDB_CODE_SUCCESS;
233,854,492✔
2335
}
2336

2337
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
233,845,999✔
2338
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,326,875,491✔
2339
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
1,093,043,309✔
2340
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
1,093,048,757✔
2341
    int32_t       type = pFieldE->type;
1,093,046,818✔
2342
    int32_t       bufLen = 0;
1,093,042,478✔
2343
    char*         p = NULL;
1,093,042,478✔
2344
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
1,093,042,478✔
2345
      continue;
1,091,652,492✔
2346
    } else {
2347
      bufLen = 64;
1,383,264✔
2348
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,383,264✔
2349
      pFieldE->bytes = bufLen;
1,383,264✔
2350
      pField->bytes = bufLen;
1,383,264✔
2351
    }
2352
    if (!p) return terrno;
1,383,264✔
2353
    pResultInfo->convertBuf[i] = p;
1,383,264✔
2354

2355
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
929,672,815✔
2356
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
928,289,551✔
2357
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
928,289,551✔
2358
      p += bufLen;
928,289,551✔
2359
      if (TSDB_CODE_SUCCESS != code) {
928,289,551✔
2360
        return code;
×
2361
      }
2362
    }
2363
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,383,264✔
2364
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,383,264✔
2365
  }
2366
  return 0;
233,847,578✔
2367
}
2368

2369
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
346,494✔
2370
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
692,988✔
2371
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
346,494✔
2372
}
2373

2374
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
173,247✔
2375
  char*   p = (char*)pResultInfo->pData;
173,247✔
2376
  int32_t blockVersion = *(int32_t*)p;
173,247✔
2377

2378
  int32_t numOfRows = pResultInfo->numOfRows;
173,247✔
2379
  int32_t numOfCols = pResultInfo->numOfCols;
173,247✔
2380

2381
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2382
  // length |
2383
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
173,247✔
2384
  if (numOfCols != cols) {
173,247✔
2385
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2386
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2387
  }
2388

2389
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
173,247✔
2390
  int32_t* colLength = (int32_t*)(p + len);
173,247✔
2391
  len += sizeof(int32_t) * numOfCols;
173,247✔
2392

2393
  char* pStart = p + len;
173,247✔
2394
  for (int32_t i = 0; i < numOfCols; ++i) {
750,529✔
2395
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
577,282✔
2396

2397
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
577,282✔
2398
      int32_t* offset = (int32_t*)pStart;
202,282✔
2399
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
202,282✔
2400
      len += lenTmp;
202,282✔
2401
      pStart += lenTmp;
202,282✔
2402

2403
      int32_t estimateColLen = 0;
202,282✔
2404
      for (int32_t j = 0; j < numOfRows; ++j) {
1,013,768✔
2405
        if (offset[j] == -1) {
811,486✔
2406
          continue;
45,008✔
2407
        }
2408
        char* data = offset[j] + pStart;
766,478✔
2409

2410
        int32_t jsonInnerType = *data;
766,478✔
2411
        char*   jsonInnerData = data + CHAR_BYTES;
766,478✔
2412
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
766,478✔
2413
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
10,608✔
2414
        } else if (tTagIsJson(data)) {
755,870✔
2415
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
190,228✔
2416
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
565,642✔
2417
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
525,862✔
2418
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
39,780✔
2419
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
29,172✔
2420
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
10,608✔
2421
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
10,608✔
2422
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
2423
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2424
        } else {
2425
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2426
          return -1;
×
2427
        }
2428
      }
2429
      len += TMAX(colLen, estimateColLen);
202,282✔
2430
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
375,000✔
2431
      int32_t lenTmp = numOfRows * sizeof(int32_t);
44,200✔
2432
      len += (lenTmp + colLen);
44,200✔
2433
      pStart += lenTmp;
44,200✔
2434
    } else {
2435
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
330,800✔
2436
      len += (lenTmp + colLen);
330,800✔
2437
      pStart += lenTmp;
330,800✔
2438
    }
2439
    pStart += colLen;
577,282✔
2440
  }
2441

2442
  // Ensure the complete structure of the block, including the blankfill field,
2443
  // even though it is not used on the client side.
2444
  len += sizeof(bool);
173,247✔
2445
  return len;
173,247✔
2446
}
2447

2448
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
233,992,709✔
2449
  int32_t numOfRows = pResultInfo->numOfRows;
233,992,709✔
2450
  int32_t numOfCols = pResultInfo->numOfCols;
233,999,042✔
2451
  bool    needConvert = false;
233,993,756✔
2452
  for (int32_t i = 0; i < numOfCols; ++i) {
1,327,513,303✔
2453
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
1,093,687,445✔
2454
      needConvert = true;
173,247✔
2455
      break;
173,247✔
2456
    }
2457
  }
2458

2459
  if (!needConvert) {
233,999,105✔
2460
    return TSDB_CODE_SUCCESS;
233,825,858✔
2461
  }
2462

2463
  tscDebug("start to convert form json format string");
173,247✔
2464

2465
  char*   p = (char*)pResultInfo->pData;
173,247✔
2466
  int32_t blockVersion = *(int32_t*)p;
173,247✔
2467
  int32_t dataLen = estimateJsonLen(pResultInfo);
173,247✔
2468
  if (dataLen <= 0) {
173,247✔
2469
    tscError("doConvertJson error: estimateJsonLen failed");
×
2470
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2471
  }
2472

2473
  taosMemoryFreeClear(pResultInfo->convertJson);
173,247✔
2474
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
173,247✔
2475
  if (pResultInfo->convertJson == NULL) return terrno;
173,247✔
2476
  char* p1 = pResultInfo->convertJson;
173,247✔
2477

2478
  int32_t totalLen = 0;
173,247✔
2479
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
173,247✔
2480
  if (numOfCols != cols) {
173,247✔
2481
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2482
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2483
  }
2484

2485
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
173,247✔
2486
  (void)memcpy(p1, p, len);
173,247✔
2487

2488
  p += len;
173,247✔
2489
  p1 += len;
173,247✔
2490
  totalLen += len;
173,247✔
2491

2492
  len = sizeof(int32_t) * numOfCols;
173,247✔
2493
  int32_t* colLength = (int32_t*)p;
173,247✔
2494
  int32_t* colLength1 = (int32_t*)p1;
173,247✔
2495
  (void)memcpy(p1, p, len);
173,247✔
2496
  p += len;
173,247✔
2497
  p1 += len;
173,247✔
2498
  totalLen += len;
173,247✔
2499

2500
  char* pStart = p;
173,247✔
2501
  char* pStart1 = p1;
173,247✔
2502
  for (int32_t i = 0; i < numOfCols; ++i) {
750,529✔
2503
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
577,282✔
2504
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
577,282✔
2505
    if (colLen >= dataLen) {
577,282✔
2506
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2507
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2508
    }
2509
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
577,282✔
2510
      int32_t* offset = (int32_t*)pStart;
202,282✔
2511
      int32_t* offset1 = (int32_t*)pStart1;
202,282✔
2512
      len = numOfRows * sizeof(int32_t);
202,282✔
2513
      (void)memcpy(pStart1, pStart, len);
202,282✔
2514
      pStart += len;
202,282✔
2515
      pStart1 += len;
202,282✔
2516
      totalLen += len;
202,282✔
2517

2518
      len = 0;
202,282✔
2519
      for (int32_t j = 0; j < numOfRows; ++j) {
1,013,768✔
2520
        if (offset[j] == -1) {
811,486✔
2521
          continue;
45,008✔
2522
        }
2523
        char* data = offset[j] + pStart;
766,478✔
2524

2525
        int32_t jsonInnerType = *data;
766,478✔
2526
        char*   jsonInnerData = data + CHAR_BYTES;
766,478✔
2527
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
766,478✔
2528
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
766,478✔
2529
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
10,608✔
2530
          varDataSetLen(dst, strlen(varDataVal(dst)));
10,608✔
2531
        } else if (tTagIsJson(data)) {
755,870✔
2532
          char* jsonString = NULL;
190,228✔
2533
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
190,228✔
2534
          if (jsonString == NULL) {
190,228✔
2535
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2536
            return terrno;
×
2537
          }
2538
          STR_TO_VARSTR(dst, jsonString);
190,228✔
2539
          taosMemoryFree(jsonString);
190,228✔
2540
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
565,642✔
2541
          *(char*)varDataVal(dst) = '\"';
525,862✔
2542
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
525,862✔
2543
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
525,862✔
2544
                                         pResultInfo->charsetCxt);
2545
          if (length <= 0) {
525,862✔
2546
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
442✔
2547
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2548
            length = 0;
442✔
2549
          }
2550
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
525,862✔
2551
                                                 varDataVal(tmp), length);
2552
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
525,862✔
2553
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
525,862✔
2554
          tscError("value:%s.", varDataVal(dst));
525,862✔
2555
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
39,780✔
2556
          double jsonVd = *(double*)(jsonInnerData);
29,172✔
2557
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
29,172✔
2558
          varDataSetLen(dst, strlen(varDataVal(dst)));
29,172✔
2559
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
10,608✔
2560
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
10,608✔
2561
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
10,608✔
2562
          varDataSetLen(dst, strlen(varDataVal(dst)));
10,608✔
2563
        } else {
2564
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2565
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2566
        }
2567

2568
        offset1[j] = len;
766,478✔
2569
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
766,478✔
2570
        len += varDataTLen(dst);
766,478✔
2571
      }
2572
      colLen1 = len;
202,282✔
2573
      totalLen += colLen1;
202,282✔
2574
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
202,282✔
2575
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
375,000✔
2576
      len = numOfRows * sizeof(int32_t);
44,200✔
2577
      (void)memcpy(pStart1, pStart, len);
44,200✔
2578
      pStart += len;
44,200✔
2579
      pStart1 += len;
44,200✔
2580
      totalLen += len;
44,200✔
2581
      totalLen += colLen;
44,200✔
2582
      (void)memcpy(pStart1, pStart, colLen);
44,200✔
2583
    } else {
2584
      len = BitmapLen(pResultInfo->numOfRows);
330,800✔
2585
      (void)memcpy(pStart1, pStart, len);
330,800✔
2586
      pStart += len;
330,800✔
2587
      pStart1 += len;
330,800✔
2588
      totalLen += len;
330,800✔
2589
      totalLen += colLen;
330,800✔
2590
      (void)memcpy(pStart1, pStart, colLen);
330,800✔
2591
    }
2592
    pStart += colLen;
577,282✔
2593
    pStart1 += colLen1;
577,282✔
2594
  }
2595

2596
  // Ensure the complete structure of the block, including the blankfill field,
2597
  // even though it is not used on the client side.
2598
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2599
  totalLen += sizeof(bool);
173,247✔
2600

2601
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
173,247✔
2602
  pResultInfo->pData = pResultInfo->convertJson;
173,247✔
2603
  return TSDB_CODE_SUCCESS;
173,247✔
2604
}
2605

2606
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
246,369,611✔
2607
  bool convertForDecimal = convertUcs4;
246,369,611✔
2608
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
246,369,611✔
2609
    tscError("setResultDataPtr paras error");
28✔
2610
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2611
  }
2612

2613
  if (pResultInfo->numOfRows == 0) {
246,375,494✔
2614
    return TSDB_CODE_SUCCESS;
12,377,641✔
2615
  }
2616

2617
  if (pResultInfo->pData == NULL) {
233,998,484✔
2618
    tscError("setResultDataPtr error: pData is NULL");
×
2619
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2620
  }
2621

2622
  int32_t code = doPrepareResPtr(pResultInfo);
233,996,461✔
2623
  if (code != TSDB_CODE_SUCCESS) {
233,993,835✔
2624
    return code;
×
2625
  }
2626
  code = doConvertJson(pResultInfo);
233,993,835✔
2627
  if (code != TSDB_CODE_SUCCESS) {
233,990,445✔
2628
    return code;
×
2629
  }
2630

2631
  char* p = (char*)pResultInfo->pData;
233,990,445✔
2632

2633
  // version:
2634
  int32_t blockVersion = *(int32_t*)p;
233,992,423✔
2635
  p += sizeof(int32_t);
233,993,835✔
2636

2637
  int32_t dataLen = *(int32_t*)p;
233,996,229✔
2638
  p += sizeof(int32_t);
233,997,234✔
2639

2640
  int32_t rows = *(int32_t*)p;
233,997,205✔
2641
  p += sizeof(int32_t);
233,998,499✔
2642

2643
  int32_t cols = *(int32_t*)p;
233,997,794✔
2644
  p += sizeof(int32_t);
233,999,437✔
2645

2646
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
233,998,404✔
2647
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
3,698✔
2648
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2649
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2650
  }
2651

2652
  int32_t hasColumnSeg = *(int32_t*)p;
233,996,426✔
2653
  p += sizeof(int32_t);
233,994,124✔
2654

2655
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
233,999,041✔
2656
  p += sizeof(uint64_t);
233,999,041✔
2657

2658
  // check fields
2659
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,327,722,246✔
2660
    int8_t type = *(int8_t*)p;
1,093,738,027✔
2661
    p += sizeof(int8_t);
1,093,729,254✔
2662

2663
    int32_t bytes = *(int32_t*)p;
1,093,732,685✔
2664
    p += sizeof(int32_t);
1,093,736,242✔
2665

2666
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
1,093,731,509✔
2667
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
175,910✔
2668
    }
2669
  }
2670

2671
  int32_t* colLength = (int32_t*)p;
233,999,440✔
2672
  p += sizeof(int32_t) * pResultInfo->numOfCols;
233,999,440✔
2673

2674
  char* pStart = p;
233,997,745✔
2675
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,327,709,370✔
2676
    if ((pStart - pResultInfo->pData) >= dataLen) {
1,093,716,705✔
2677
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2678
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2679
    }
2680
    if (blockVersion == BLOCK_VERSION_1) {
1,093,669,461✔
2681
      colLength[i] = htonl(colLength[i]);
532,367,589✔
2682
    }
2683
    if (colLength[i] >= dataLen) {
1,093,671,692✔
2684
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2685
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2686
    }
2687
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
1,093,688,478✔
2688
      tscError("invalid type %d", pResultInfo->fields[i].type);
1,040✔
2689
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2690
    }
2691
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,093,709,048✔
2692
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
266,487,356✔
2693
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
266,474,428✔
2694
    } else {
2695
      pResultInfo->pCol[i].nullbitmap = pStart;
827,266,334✔
2696
      pStart += BitmapLen(pResultInfo->numOfRows);
827,266,059✔
2697
    }
2698

2699
    pResultInfo->pCol[i].pData = pStart;
1,093,741,160✔
2700
    pResultInfo->length[i] =
2,147,483,647✔
2701
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2702
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,093,704,935✔
2703

2704
    pStart += colLength[i];
1,093,706,348✔
2705
  }
2706

2707
  p = pStart;
233,999,119✔
2708
  // bool blankFill = *(bool*)p;
2709
  p += sizeof(bool);
233,999,119✔
2710
  int32_t offset = p - pResultInfo->pData;
233,998,726✔
2711
  if (offset > dataLen) {
233,995,689✔
2712
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2713
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2714
  }
2715

2716
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2717
  if (convertUcs4) {
233,995,689✔
2718
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
233,853,568✔
2719
  }
2720
#endif
2721
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
233,998,099✔
2722
    code = convertDecimalType(pResultInfo);
233,856,167✔
2723
  }
2724
  return code;
233,988,936✔
2725
}
2726

2727
char* getDbOfConnection(STscObj* pObj) {
814,342,176✔
2728
  terrno = TSDB_CODE_SUCCESS;
814,342,176✔
2729
  char* p = NULL;
814,347,383✔
2730
  (void)taosThreadMutexLock(&pObj->mutex);
814,347,383✔
2731
  size_t len = strlen(pObj->db);
814,349,317✔
2732
  if (len > 0) {
814,349,985✔
2733
    p = taosStrndup(pObj->db, tListLen(pObj->db));
714,616,331✔
2734
    if (p == NULL) {
714,617,514✔
2735
      tscError("failed to taosStrndup db name");
×
2736
    }
2737
  }
2738

2739
  (void)taosThreadMutexUnlock(&pObj->mutex);
814,351,168✔
2740
  return p;
814,340,369✔
2741
}
2742

2743
void setConnectionDB(STscObj* pTscObj, const char* db) {
2,053,114✔
2744
  if (db == NULL || pTscObj == NULL) {
2,053,114✔
2745
    tscError("setConnectionDB para is NULL");
×
2746
    return;
×
2747
  }
2748

2749
  (void)taosThreadMutexLock(&pTscObj->mutex);
2,053,114✔
2750
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
2,053,118✔
2751
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
2,053,114✔
2752
}
2753

2754
void resetConnectDB(STscObj* pTscObj) {
×
2755
  if (pTscObj == NULL) {
×
2756
    return;
×
2757
  }
2758

2759
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2760
  pTscObj->db[0] = 0;
×
2761
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2762
}
2763

2764
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
141,563,562✔
2765
                              bool isStmt) {
2766
  if (pResultInfo == NULL || pRsp == NULL) {
141,563,562✔
2767
    tscError("setQueryResultFromRsp paras is null");
14✔
2768
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2769
  }
2770

2771
  taosMemoryFreeClear(pResultInfo->pRspMsg);
141,563,548✔
2772
  pResultInfo->pRspMsg = (const char*)pRsp;
141,563,548✔
2773
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
141,563,533✔
2774
  pResultInfo->current = 0;
141,563,533✔
2775
  pResultInfo->completed = (pRsp->completed == 1);
141,563,533✔
2776
  pResultInfo->precision = pRsp->precision;
141,563,534✔
2777

2778
  // decompress data if needed
2779
  int32_t payloadLen = htonl(pRsp->payloadLen);
141,563,548✔
2780

2781
  if (pRsp->compressed) {
141,563,519✔
2782
    if (pResultInfo->decompBuf == NULL) {
×
2783
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
2784
      if (pResultInfo->decompBuf == NULL) {
×
2785
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2786
        return terrno;
×
2787
      }
2788
      pResultInfo->decompBufSize = payloadLen;
×
2789
    } else {
2790
      if (pResultInfo->decompBufSize < payloadLen) {
×
2791
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2792
        if (p == NULL) {
×
2793
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2794
          return terrno;
×
2795
        }
2796

2797
        pResultInfo->decompBuf = p;
×
2798
        pResultInfo->decompBufSize = payloadLen;
×
2799
      }
2800
    }
2801
  }
2802

2803
  if (payloadLen > 0) {
141,563,533✔
2804
    int32_t compLen = *(int32_t*)pRsp->data;
129,186,277✔
2805
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
129,186,277✔
2806

2807
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
129,186,277✔
2808

2809
    if (pRsp->compressed && compLen < rawLen) {
129,186,263✔
2810
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
2811
      if (len < 0) {
×
2812
        tscError("tsDecompressString failed");
×
2813
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2814
      }
2815
      if (len != rawLen) {
×
2816
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2817
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2818
      }
2819
      pResultInfo->pData = pResultInfo->decompBuf;
×
2820
      pResultInfo->payloadLen = rawLen;
×
2821
    } else {
2822
      pResultInfo->pData = pStart;
129,186,249✔
2823
      pResultInfo->payloadLen = htonl(pRsp->compLen);
129,186,263✔
2824
      if (pRsp->compLen != pRsp->payloadLen) {
129,186,277✔
2825
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2826
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2827
      }
2828
    }
2829
  }
2830

2831
  // TODO handle the compressed case
2832
  pResultInfo->totalRows += pResultInfo->numOfRows;
141,563,548✔
2833

2834
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
141,563,519✔
2835
  return code;
141,561,456✔
2836
}
2837

2838
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
196✔
2839
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
196✔
2840
  void*              clientRpc = NULL;
196✔
2841
  SServerStatusRsp   statusRsp = {0};
196✔
2842
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
196✔
2843
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
196✔
2844
  SRpcMsg  rpcRsp = {0};
196✔
2845
  SRpcInit rpcInit = {0};
196✔
2846
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
196✔
2847

2848
  rpcInit.label = "CHK";
196✔
2849
  rpcInit.numOfThreads = 1;
196✔
2850
  rpcInit.cfp = NULL;
196✔
2851
  rpcInit.sessions = 16;
196✔
2852
  rpcInit.connType = TAOS_CONN_CLIENT;
196✔
2853
  rpcInit.idleTime = tsShellActivityTimer * 1000;
196✔
2854
  rpcInit.compressSize = tsCompressMsgSize;
196✔
2855
  rpcInit.user = "_dnd";
196✔
2856

2857
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
196✔
2858
  connLimitNum = TMAX(connLimitNum, 10);
196✔
2859
  connLimitNum = TMIN(connLimitNum, 500);
196✔
2860
  rpcInit.connLimitNum = connLimitNum;
196✔
2861
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
196✔
2862
  rpcInit.readTimeout = tsReadTimeout;
196✔
2863
  rpcInit.ipv6 = tsEnableIpv6;
196✔
2864
  rpcInit.enableSSL = tsEnableTLS;
196✔
2865

2866
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
196✔
2867
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
196✔
2868
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
196✔
2869
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
196✔
2870
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
196✔
2871

2872
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
196✔
2873
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2874
    goto _OVER;
×
2875
  }
2876

2877
  clientRpc = rpcOpen(&rpcInit);
196✔
2878
  if (clientRpc == NULL) {
196✔
2879
    code = terrno;
×
2880
    tscError("failed to init server status client since %s", tstrerror(code));
×
2881
    goto _OVER;
×
2882
  }
2883

2884
  if (fqdn == NULL) {
196✔
2885
    fqdn = tsLocalFqdn;
196✔
2886
  }
2887

2888
  if (port == 0) {
196✔
2889
    port = tsServerPort;
196✔
2890
  }
2891

2892
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
196✔
2893
  epSet.eps[0].port = (uint16_t)port;
196✔
2894
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
196✔
2895
  if (TSDB_CODE_SUCCESS != ret) {
196✔
2896
    tscError("failed to send recv since %s", tstrerror(ret));
×
2897
    goto _OVER;
×
2898
  }
2899

2900
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
196✔
2901
    tscError("failed to send server status req since %s", terrstr());
46✔
2902
    goto _OVER;
46✔
2903
  }
2904

2905
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
150✔
2906
    tscError("failed to parse server status rsp since %s", terrstr());
×
2907
    goto _OVER;
×
2908
  }
2909

2910
  code = statusRsp.statusCode;
150✔
2911
  if (details != NULL) {
150✔
2912
    tstrncpy(details, statusRsp.details, maxlen);
150✔
2913
  }
2914

2915
_OVER:
183✔
2916
  if (clientRpc != NULL) {
196✔
2917
    rpcClose(clientRpc);
196✔
2918
  }
2919
  if (rpcRsp.pCont != NULL) {
196✔
2920
    rpcFreeCont(rpcRsp.pCont);
150✔
2921
  }
2922
  return code;
196✔
2923
}
2924

2925
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,126✔
2926
                      int32_t acctId, char* db) {
2927
  SName name = {0};
1,126✔
2928

2929
  if (len1 <= 0) {
1,126✔
2930
    return -1;
×
2931
  }
2932

2933
  const char* dbName = db;
1,126✔
2934
  const char* tbName = NULL;
1,126✔
2935
  int32_t     dbLen = 0;
1,126✔
2936
  int32_t     tbLen = 0;
1,126✔
2937
  if (len2 > 0) {
1,126✔
2938
    dbName = str + pos1;
×
2939
    dbLen = len1;
×
2940
    tbName = str + pos2;
×
2941
    tbLen = len2;
×
2942
  } else {
2943
    dbLen = strlen(db);
1,126✔
2944
    tbName = str + pos1;
1,126✔
2945
    tbLen = len1;
1,126✔
2946
  }
2947

2948
  if (dbLen <= 0 || tbLen <= 0) {
1,126✔
2949
    return -1;
×
2950
  }
2951

2952
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,126✔
2953
    return -1;
×
2954
  }
2955

2956
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,126✔
2957
    return -1;
×
2958
  }
2959

2960
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,126✔
2961
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,126✔
2962

2963
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,126✔
2964
  if (pDb) {
1,126✔
2965
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2966
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2967
    }
2968
  } else {
2969
    STablesReq db;
1,126✔
2970
    db.pTables = taosArrayInit(20, sizeof(SName));
1,126✔
2971
    if (NULL == db.pTables) {
1,126✔
2972
      return terrno;
×
2973
    }
2974
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,126✔
2975
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,252✔
2976
      return terrno;
×
2977
    }
2978
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,126✔
2979
  }
2980

2981
  return TSDB_CODE_SUCCESS;
1,126✔
2982
}
2983

2984
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,126✔
2985
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,126✔
2986
  if (NULL == pHash) {
1,126✔
2987
    return terrno;
×
2988
  }
2989

2990
  bool    inEscape = false;
1,126✔
2991
  int32_t code = 0;
1,126✔
2992
  void*   pIter = NULL;
1,126✔
2993

2994
  int32_t vIdx = 0;
1,126✔
2995
  int32_t vPos[2];
1,126✔
2996
  int32_t vLen[2];
1,126✔
2997

2998
  (void)memset(vPos, -1, sizeof(vPos));
1,126✔
2999
  (void)memset(vLen, 0, sizeof(vLen));
1,126✔
3000

3001
  for (int32_t i = 0;; ++i) {
5,630✔
3002
    if (0 == *(tbList + i)) {
5,630✔
3003
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,126✔
3004
        vLen[vIdx] = i - vPos[vIdx];
1,126✔
3005
      }
3006

3007
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,126✔
3008
      if (code) {
1,126✔
3009
        goto _return;
×
3010
      }
3011

3012
      break;
1,126✔
3013
    }
3014

3015
    if ('`' == *(tbList + i)) {
4,504✔
3016
      inEscape = !inEscape;
×
3017
      if (!inEscape) {
×
3018
        if (vPos[vIdx] >= 0) {
×
3019
          vLen[vIdx] = i - vPos[vIdx];
×
3020
        } else {
3021
          goto _return;
×
3022
        }
3023
      }
3024

3025
      continue;
×
3026
    }
3027

3028
    if (inEscape) {
4,504✔
3029
      if (vPos[vIdx] < 0) {
×
3030
        vPos[vIdx] = i;
×
3031
      }
3032
      continue;
×
3033
    }
3034

3035
    if ('.' == *(tbList + i)) {
4,504✔
3036
      if (vPos[vIdx] < 0) {
×
3037
        goto _return;
×
3038
      }
3039
      if (vLen[vIdx] <= 0) {
×
3040
        vLen[vIdx] = i - vPos[vIdx];
×
3041
      }
3042
      vIdx++;
×
3043
      if (vIdx >= 2) {
×
3044
        goto _return;
×
3045
      }
3046
      continue;
×
3047
    }
3048

3049
    if (',' == *(tbList + i)) {
4,504✔
3050
      if (vPos[vIdx] < 0) {
×
3051
        goto _return;
×
3052
      }
3053
      if (vLen[vIdx] <= 0) {
×
3054
        vLen[vIdx] = i - vPos[vIdx];
×
3055
      }
3056

3057
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3058
      if (code) {
×
3059
        goto _return;
×
3060
      }
3061

3062
      (void)memset(vPos, -1, sizeof(vPos));
×
3063
      (void)memset(vLen, 0, sizeof(vLen));
×
3064
      vIdx = 0;
×
3065
      continue;
×
3066
    }
3067

3068
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
4,504✔
3069
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3070
        vLen[vIdx] = i - vPos[vIdx];
×
3071
      }
3072
      continue;
×
3073
    }
3074

3075
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
4,504✔
3076
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
563✔
3077
      if (vLen[vIdx] > 0) {
4,504✔
3078
        goto _return;
×
3079
      }
3080
      if (vPos[vIdx] < 0) {
4,504✔
3081
        vPos[vIdx] = i;
1,126✔
3082
      }
3083
      continue;
4,504✔
3084
    }
3085

3086
    goto _return;
×
3087
  }
3088

3089
  int32_t dbNum = taosHashGetSize(pHash);
1,126✔
3090
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,126✔
3091
  if (NULL == pReq) {
1,126✔
3092
    TSC_ERR_JRET(terrno);
×
3093
  }
3094
  pIter = taosHashIterate(pHash, NULL);
1,126✔
3095
  while (pIter) {
2,252✔
3096
    STablesReq* pDb = (STablesReq*)pIter;
1,126✔
3097
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,252✔
3098
      TSC_ERR_JRET(terrno);
×
3099
    }
3100
    pIter = taosHashIterate(pHash, pIter);
1,126✔
3101
  }
3102

3103
  taosHashCleanup(pHash);
1,126✔
3104

3105
  return TSDB_CODE_SUCCESS;
1,126✔
3106

3107
_return:
×
3108

3109
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3110

3111
  pIter = taosHashIterate(pHash, NULL);
×
3112
  while (pIter) {
×
3113
    STablesReq* pDb = (STablesReq*)pIter;
×
3114
    taosArrayDestroy(pDb->pTables);
×
3115
    pIter = taosHashIterate(pHash, pIter);
×
3116
  }
3117

3118
  taosHashCleanup(pHash);
×
3119

3120
  return terrno;
×
3121
}
3122

3123
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,126✔
3124
  SSyncQueryParam* pParam = param;
1,126✔
3125
  pParam->pRequest->code = code;
1,126✔
3126

3127
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,126✔
3128
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3129
  }
3130
}
1,126✔
3131

3132
void syncQueryFn(void* param, void* res, int32_t code) {
796,576,570✔
3133
  SSyncQueryParam* pParam = param;
796,576,570✔
3134
  pParam->pRequest = res;
796,576,570✔
3135

3136
  if (pParam->pRequest) {
796,579,502✔
3137
    pParam->pRequest->code = code;
796,569,949✔
3138
    clientOperateReport(pParam->pRequest);
796,575,586✔
3139
  }
3140

3141
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
796,567,166✔
3142
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3143
  }
3144
}
796,580,538✔
3145

3146
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
796,609,227✔
3147
                        int8_t source) {
3148
  if (sql == NULL || NULL == fp) {
796,609,227✔
3149
    terrno = TSDB_CODE_INVALID_PARA;
84✔
3150
    if (fp) {
×
3151
      fp(param, NULL, terrno);
×
3152
    }
3153

3154
    return;
×
3155
  }
3156

3157
  size_t sqlLen = strlen(sql);
796,609,854✔
3158
  if (sqlLen > (size_t)tsMaxSQLLength) {
796,609,854✔
3159
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
460✔
3160
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
460✔
3161
    fp(param, NULL, terrno);
460✔
3162
    return;
460✔
3163
  }
3164

3165
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
796,609,394✔
3166

3167
  SRequestObj* pRequest = NULL;
796,609,506✔
3168
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
796,613,543✔
3169
  if (code != TSDB_CODE_SUCCESS) {
796,612,928✔
3170
    terrno = code;
×
3171
    fp(param, NULL, terrno);
×
3172
    return;
×
3173
  }
3174

3175
  code = connCheckAndUpateMetric(connId);
796,612,928✔
3176
  if (code != TSDB_CODE_SUCCESS) {
796,613,083✔
3177
    terrno = code;
260✔
3178
    fp(param, NULL, terrno);
260✔
3179
    return;
260✔
3180
  }
3181

3182
  pRequest->source = source;
796,612,823✔
3183
  pRequest->body.queryFp = fp;
796,611,812✔
3184
  doAsyncQuery(pRequest, false);
796,611,826✔
3185
}
3186

3187
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
246✔
3188
                                 int64_t reqid) {
3189
  if (sql == NULL || NULL == fp) {
246✔
3190
    terrno = TSDB_CODE_INVALID_PARA;
×
3191
    if (fp) {
×
3192
      fp(param, NULL, terrno);
×
3193
    }
3194

3195
    return;
×
3196
  }
3197

3198
  size_t sqlLen = strlen(sql);
246✔
3199
  if (sqlLen > (size_t)tsMaxSQLLength) {
246✔
3200
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
3201
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3202
    fp(param, NULL, terrno);
×
3203
    return;
×
3204
  }
3205

3206
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
246✔
3207

3208
  SRequestObj* pRequest = NULL;
246✔
3209
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
246✔
3210
  if (code != TSDB_CODE_SUCCESS) {
246✔
3211
    terrno = code;
×
3212
    fp(param, NULL, terrno);
×
3213
    return;
×
3214
  }
3215

3216
  code = connCheckAndUpateMetric(connId);
246✔
3217

3218
  if (code != TSDB_CODE_SUCCESS) {
246✔
3219
    terrno = code;
×
3220
    fp(param, NULL, terrno);
×
3221
    return;
×
3222
  }
3223

3224
  pRequest->body.queryFp = fp;
246✔
3225

3226
  doAsyncQuery(pRequest, false);
246✔
3227
}
3228

3229
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
796,488,188✔
3230
  if (NULL == taos) {
796,488,188✔
3231
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3232
    return NULL;
×
3233
  }
3234

3235
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
796,488,188✔
3236
  if (NULL == param) {
796,491,319✔
3237
    return NULL;
×
3238
  }
3239

3240
  int32_t code = tsem_init(&param->sem, 0, 0);
796,491,319✔
3241
  if (TSDB_CODE_SUCCESS != code) {
796,486,464✔
3242
    taosMemoryFree(param);
×
3243
    return NULL;
×
3244
  }
3245

3246
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
796,486,464✔
3247
  code = tsem_wait(&param->sem);
796,487,162✔
3248
  if (TSDB_CODE_SUCCESS != code) {
796,492,191✔
3249
    taosMemoryFree(param);
×
3250
    return NULL;
×
3251
  }
3252
  code = tsem_destroy(&param->sem);
796,492,191✔
3253
  if (TSDB_CODE_SUCCESS != code) {
796,494,233✔
3254
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3255
  }
3256

3257
  SRequestObj* pRequest = NULL;
796,494,183✔
3258
  if (param->pRequest != NULL) {
796,494,183✔
3259
    param->pRequest->syncQuery = true;
796,493,623✔
3260
    pRequest = param->pRequest;
796,493,387✔
3261
    param->pRequest->inCallback = false;
796,493,542✔
3262
  }
3263
  taosMemoryFree(param);
796,493,531✔
3264

3265
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3266
  //          *(int64_t*)taos, pRequest);
3267

3268
  return pRequest;
796,493,305✔
3269
}
3270

3271
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
246✔
3272
  if (NULL == taos) {
246✔
3273
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3274
    return NULL;
×
3275
  }
3276

3277
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
246✔
3278
  if (param == NULL) {
246✔
3279
    return NULL;
×
3280
  }
3281
  int32_t code = tsem_init(&param->sem, 0, 0);
246✔
3282
  if (TSDB_CODE_SUCCESS != code) {
246✔
3283
    taosMemoryFree(param);
×
3284
    return NULL;
×
3285
  }
3286

3287
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
246✔
3288
  code = tsem_wait(&param->sem);
246✔
3289
  if (TSDB_CODE_SUCCESS != code) {
246✔
3290
    taosMemoryFree(param);
×
3291
    return NULL;
×
3292
  }
3293
  SRequestObj* pRequest = NULL;
246✔
3294
  if (param->pRequest != NULL) {
246✔
3295
    param->pRequest->syncQuery = true;
246✔
3296
    pRequest = param->pRequest;
246✔
3297
  }
3298
  taosMemoryFree(param);
246✔
3299

3300
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3301
  //   *(int64_t*)taos, pRequest);
3302

3303
  return pRequest;
246✔
3304
}
3305

3306
static void fetchCallback(void* pResult, void* param, int32_t code) {
138,534,523✔
3307
  SRequestObj* pRequest = (SRequestObj*)param;
138,534,523✔
3308

3309
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
138,534,523✔
3310

3311
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
138,534,523✔
3312
           tstrerror(code), pRequest->requestId);
3313

3314
  pResultInfo->pData = pResult;
138,534,523✔
3315
  pResultInfo->numOfRows = 0;
138,534,523✔
3316

3317
  if (code != TSDB_CODE_SUCCESS) {
138,534,507✔
3318
    pRequest->code = code;
×
3319
    taosMemoryFreeClear(pResultInfo->pData);
×
3320
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3321
    return;
×
3322
  }
3323

3324
  if (pRequest->code != TSDB_CODE_SUCCESS) {
138,534,507✔
3325
    taosMemoryFreeClear(pResultInfo->pData);
×
3326
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3327
    return;
×
3328
  }
3329

3330
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
159,107,173✔
3331
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
138,534,478✔
3332
  if (pRequest->code != TSDB_CODE_SUCCESS) {
138,532,022✔
3333
    pResultInfo->numOfRows = 0;
449✔
3334
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
449✔
3335
             tstrerror(pRequest->code), pRequest->requestId);
3336
  } else {
3337
    tscDebug(
138,531,503✔
3338
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3339
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3340

3341
    STscObj*            pTscObj = pRequest->pTscObj;
138,533,235✔
3342
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
138,534,099✔
3343
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
138,534,060✔
3344
  }
3345

3346
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
138,534,552✔
3347
}
3348

3349
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
164,685,725✔
3350
  pRequest->body.fetchFp = fp;
164,685,725✔
3351
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
164,686,693✔
3352

3353
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
164,686,693✔
3354

3355
  // this query has no results or error exists, return directly
3356
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
164,686,693✔
3357
    pResultInfo->numOfRows = 0;
14✔
3358
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3359
    return;
18,372,869✔
3360
  }
3361

3362
  // all data has returned to App already, no need to try again
3363
  if (pResultInfo->completed) {
164,686,679✔
3364
    // it is a local executed query, no need to do async fetch
3365
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
26,152,141✔
3366
      if (pResultInfo->localResultFetched) {
1,307,970✔
3367
        pResultInfo->numOfRows = 0;
653,985✔
3368
        pResultInfo->current = 0;
653,985✔
3369
      } else {
3370
        pResultInfo->localResultFetched = true;
653,985✔
3371
      }
3372
    } else {
3373
      pResultInfo->numOfRows = 0;
24,844,171✔
3374
    }
3375

3376
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
26,152,141✔
3377
    return;
26,152,141✔
3378
  }
3379

3380
  SSchedulerReq req = {
138,534,552✔
3381
      .syncReq = false,
3382
      .fetchFp = fetchCallback,
3383
      .cbParam = pRequest,
3384
  };
3385

3386
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
138,534,552✔
3387
  if (TSDB_CODE_SUCCESS != code) {
138,534,552✔
3388
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3389
    // pRequest->body.fetchFp(param, pRequest, code);
3390
  }
3391
}
3392

3393
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
796,606,959✔
3394
  pRequest->inCallback = true;
796,606,959✔
3395
  int64_t this = pRequest->self;
796,614,562✔
3396
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
796,596,822✔
3397
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
20,100✔
3398
    code = TSDB_CODE_SUCCESS;
×
3399
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3400
  }
3401

3402
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
796,596,822✔
3403
           pRequest);
3404

3405
  if (pRequest->body.queryFp != NULL) {
796,599,484✔
3406
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
796,609,905✔
3407
  }
3408

3409
  SRequestObj* pReq = acquireRequest(this);
796,614,687✔
3410
  if (pReq != NULL) {
796,614,041✔
3411
    pReq->inCallback = false;
795,732,683✔
3412
    (void)releaseRequest(this);
795,733,538✔
3413
  }
3414
}
796,614,902✔
3415

3416
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
85,942✔
3417
                       SParseSqlRes* pRes) {
3418
#ifndef TD_ENTERPRISE
3419
  return TSDB_CODE_SUCCESS;
3420
#else
3421
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
85,942✔
3422
#endif
3423
}
3424

3425
void updateConnAccessInfo(SConnAccessInfo* pInfo) {
798,990,962✔
3426
  if (pInfo == NULL) {
798,990,962✔
3427
    return;
×
3428
  }
3429
  int64_t ts = taosGetTimestampMs();
798,994,066✔
3430
  if (pInfo->startTime == 0) {
798,994,066✔
3431
    pInfo->startTime = ts;
2,380,281✔
3432
  }
3433
  pInfo->lastAccessTime = ts;
798,995,209✔
3434
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc