• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4890

19 Dec 2025 11:37AM UTC coverage: 62.824% (-2.7%) from 65.487%
#4890

push

travis-ci

web-flow
feat: support TOTP code login and password expired tip (#33969)

22 of 26 new or added lines in 3 files covered. (84.62%)

1989 existing lines in 120 files now uncovered.

63068 of 100389 relevant lines covered (62.82%)

286770988.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.61
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "decimal.h"
22
#include "scheduler.h"
23
#include "tdatablock.h"
24
#include "tdataformat.h"
25
#include "tdef.h"
26
#include "tglobal.h"
27
#include "tmisce.h"
28
#include "tmsg.h"
29
#include "tmsgtype.h"
30
#include "tpagedbuf.h"
31
#include "tref.h"
32
#include "tsched.h"
33
#include "tversion.h"
34

35
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
36
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
37

38
int32_t connUpdateSessMgtMetric(int64_t connId, SSessParam* pParam);
39
//int32_t tscUpdateSessMgtMetric(STscObj* pTscObj, SSessParam* pParam);
40

41
void setQueryRequest(int64_t rId) {
112,981,501✔
42
  SRequestObj* pReq = acquireRequest(rId);
112,981,501✔
43
  if (pReq != NULL) {
112,982,186✔
44
    pReq->isQuery = true;
112,971,844✔
45
    (void)releaseRequest(rId);
112,971,844✔
46
  }
47
}
112,981,963✔
48

49
static bool stringLengthCheck(const char* str, size_t maxsize) {
6,563,526✔
50
  if (str == NULL) {
6,563,526✔
51
    return false;
×
52
  }
53

54
  size_t len = strlen(str);
6,563,526✔
55
  if (len <= 0 || len > maxsize) {
6,563,526✔
56
    return false;
×
57
  }
58

59
  return true;
6,563,781✔
60
}
61

62
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
2,693,196✔
63

64
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
2,692,607✔
65

66
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,177,811✔
67

68
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
2,691,361✔
69
  char key[512] = {0};
2,691,361✔
70
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
2,691,361✔
71
  return taosStrdup(key);
2,691,361✔
72
}
73

74
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
649,072✔
75
  if (dst == NULL || src == NULL || srcLength == 0) {
649,072✔
76
    return 0;
544✔
77
  }
78

79
  size_t escapeLength = 0;
648,528✔
80
  for (size_t i = 0; i < srcLength; ++i) {
18,388,944✔
81
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
17,740,416✔
82
        src[i] == '\t') {
17,740,416✔
83
      escapeLength += 1;
×
84
    }
85
  }
86

87
  size_t dstLength = srcLength;
648,528✔
88
  if (escapeLength == 0) {
648,528✔
89
    (void)memcpy(dst, src, srcLength);
648,528✔
90
  } else {
91
    dstLength = 0;
×
92
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
93
      switch (src[i]) {
×
94
        case '\"':
×
95
          dst[dstLength++] = '\\';
×
96
          dst[dstLength++] = '\"';
×
97
          break;
×
98
        case '\\':
×
99
          dst[dstLength++] = '\\';
×
100
          dst[dstLength++] = '\\';
×
101
          break;
×
102
        case '\b':
×
103
          dst[dstLength++] = '\\';
×
104
          dst[dstLength++] = 'b';
×
105
          break;
×
106
        case '\f':
×
107
          dst[dstLength++] = '\\';
×
108
          dst[dstLength++] = 'f';
×
109
          break;
×
110
        case '\n':
×
111
          dst[dstLength++] = '\\';
×
112
          dst[dstLength++] = 'n';
×
113
          break;
×
114
        case '\r':
×
115
          dst[dstLength++] = '\\';
×
116
          dst[dstLength++] = 'r';
×
117
          break;
×
118
        case '\t':
×
119
          dst[dstLength++] = '\\';
×
120
          dst[dstLength++] = 't';
×
121
          break;
×
122
        default:
×
123
          dst[dstLength++] = src[i];
×
124
      }
125
    }
126
  }
127

128
  return dstLength;
648,528✔
129
}
130

131
bool chkRequestKilled(void* param) {
2,147,483,647✔
132
  bool         killed = false;
2,147,483,647✔
133
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
134
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
135
    killed = true;
×
136
  }
137

138
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
139

140
  return killed;
2,147,483,647✔
141
}
142

143
void cleanupAppInfo() {
1,177,909✔
144
  taosHashCleanup(appInfo.pInstMap);
1,177,909✔
145
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,177,909✔
146
  tscInfo("cluster instance map cleaned");
1,177,909✔
147
}
1,177,909✔
148

149
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
150
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
151
                               STscObj** pTscObj);
152

153
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* totp,
2,693,252✔
154
                              const char* db, uint16_t port, int connType, STscObj** pObj) {
155
  TSC_ERR_RET(taos_init());
2,693,252✔
156
  if (!validateUserName(user)) {
2,693,252✔
157
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
158
  }
159
  int32_t code = 0;
2,693,158✔
160

161
  char localDb[TSDB_DB_NAME_LEN] = {0};
2,693,158✔
162
  if (db != NULL && strlen(db) > 0) {
2,693,252✔
163
    if (!validateDbName(db)) {
1,177,811✔
164
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
165
    }
166

167
    tstrncpy(localDb, db, sizeof(localDb));
1,177,867✔
168
    (void)strdequote(localDb);
1,177,867✔
169
  }
170

171
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
2,692,965✔
172
  if (auth == NULL) {
2,692,965✔
173
    if (!validatePassword(pass)) {
2,692,645✔
174
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
175
    }
176

177
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
2,692,782✔
178
  } else {
179
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
320✔
180
  }
181

182
  int32_t totpCode = -1;
2,692,581✔
183
  if (totp != NULL) {
2,692,581✔
184
    char* endptr = NULL;
34✔
185
    totpCode = taosStr2Int32(totp, &endptr, 10);
34✔
186
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
34✔
187
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
188
    }
189
  }
190

191
  SCorEpSet epSet = {0};
2,692,581✔
192
  if (ip) {
2,691,732✔
193
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
916,339✔
194
  } else {
195
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
1,775,393✔
196
  }
197

198
  if (port) {
2,691,347✔
199
    epSet.epSet.eps[0].port = port;
110,445✔
200
    epSet.epSet.eps[1].port = port;
110,445✔
201
  }
202

203
  char* key = getClusterKey(user, secretEncrypt, ip, port);
2,691,347✔
204
  if (NULL == key) {
2,692,520✔
205
    TSC_ERR_RET(terrno);
×
206
  }
207
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
2,692,520✔
208
          user, db, key);
209
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
7,161,798✔
210
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
4,469,102✔
211
  }
212
  // for (int32_t i = 0; i < epSet.epSet.numOfEps; i++) {
213
  //   if ((code = taosValidFqdn(tsEnableIpv6, epSet.epSet.eps[i].fqdn)) != 0) {
214
  //     taosMemFree(key);
215
  //     tscError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6,
216
  //              epSet.epSet.eps[i].fqdn, tstrerror(code));
217
  //     TSC_ERR_RET(code);
218
  //   }
219
  // }
220

221
  SAppInstInfo** pInst = NULL;
2,692,696✔
222
  code = taosThreadMutexLock(&appInfo.mutex);
2,692,696✔
223
  if (TSDB_CODE_SUCCESS != code) {
2,692,696✔
224
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
225
    TSC_ERR_RET(code);
×
226
  }
227

228
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
2,692,696✔
229
  SAppInstInfo* p = NULL;
2,692,696✔
230
  if (pInst == NULL) {
2,692,696✔
231
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,235,211✔
232
    if (NULL == p) {
1,235,211✔
233
      TSC_ERR_JRET(terrno);
×
234
    }
235
    p->mgmtEp = epSet;
1,235,211✔
236
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,235,211✔
237
    if (TSDB_CODE_SUCCESS != code) {
1,235,211✔
238
      taosMemoryFree(p);
×
239
      TSC_ERR_JRET(code);
×
240
    }
241
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
1,235,211✔
242
    if (TSDB_CODE_SUCCESS != code) {
1,235,211✔
243
      taosMemoryFree(p);
44✔
244
      TSC_ERR_JRET(code);
44✔
245
    }
246
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,235,167✔
247
    if (TSDB_CODE_SUCCESS != code) {
1,235,167✔
248
      destroyAppInst(&p);
×
249
      TSC_ERR_JRET(code);
×
250
    }
251
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,235,167✔
252
    if (TSDB_CODE_SUCCESS != code) {
1,235,167✔
253
      destroyAppInst(&p);
×
254
      TSC_ERR_JRET(code);
×
255
    }
256
    p->instKey = key;
1,235,167✔
257
    key = NULL;
1,235,167✔
258
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
1,235,167✔
259

260
    pInst = &p;
1,235,167✔
261
  } else {
262
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
1,457,485✔
263
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
264
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
265
    }
266
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
267
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
1,457,485✔
268
  }
269

270
_return:
2,692,696✔
271

272
  if (TSDB_CODE_SUCCESS != code) {
2,692,696✔
273
    (void)taosThreadMutexUnlock(&appInfo.mutex);
44✔
274
    taosMemoryFreeClear(key);
44✔
275
    return code;
44✔
276
  } else {
277
    code = taosThreadMutexUnlock(&appInfo.mutex);
2,692,652✔
278
    taosMemoryFreeClear(key);
2,692,652✔
279
    if (TSDB_CODE_SUCCESS != code) {
2,692,652✔
280
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
281
      return code;
×
282
    }
283
    SSessParam pPara = {.type = SESSION_PER_USER, .value = 1};
2,692,652✔
284
    code = sessMgtUpdateUserMetric((char*)user, &pPara);
2,692,652✔
285
    if (TSDB_CODE_SUCCESS != code) {
2,692,652✔
286
      tscError("failed to connect with user:%s, code:%s", user, tstrerror(code));
×
287
      return code;
×
288
    }
289
    return taosConnectImpl(user, &secretEncrypt[0], totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
2,692,652✔
290
  }
291
}
292

293
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
294
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
295
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
296
//     return *ppAppInstInfo;
297
//   } else {
298
//     return NULL;
299
//   }
300
// }
301

302
void freeQueryParam(SSyncQueryParam* param) {
553,846✔
303
  if (param == NULL) return;
553,846✔
304
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
553,846✔
305
    tscError("failed to destroy semaphore in freeQueryParam");
×
306
  }
307
  taosMemoryFree(param);
553,846✔
308
}
309

310
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
638,827,349✔
311
                     SRequestObj** pRequest, int64_t reqid) {
312
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
638,827,349✔
313
  if (TSDB_CODE_SUCCESS != code) {
638,825,721✔
314
    tscError("failed to malloc sqlObj, %s", sql);
×
315
    return code;
×
316
  }
317

318
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
638,825,721✔
319
  if ((*pRequest)->sqlstr == NULL) {
638,822,624✔
320
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
321
    destroyRequest(*pRequest);
×
322
    *pRequest = NULL;
×
323
    return terrno;
×
324
  }
325

326
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
638,815,277✔
327
  (*pRequest)->sqlstr[sqlLen] = 0;
638,836,258✔
328
  (*pRequest)->sqlLen = sqlLen;
638,834,935✔
329
  (*pRequest)->validateOnly = validateSql;
638,834,936✔
330
  (*pRequest)->stmtBindVersion = 0;
638,834,727✔
331

332
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
638,830,158✔
333

334
  STscObj* pTscObj = (*pRequest)->pTscObj;
638,831,875✔
335
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
638,830,578✔
336
                             sizeof((*pRequest)->self));
337
  if (err) {
638,830,790✔
338
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
339
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
340
    destroyRequest(*pRequest);
×
341
    *pRequest = NULL;
×
342
    return terrno;
×
343
  }
344

345
  (*pRequest)->allocatorRefId = -1;
638,830,790✔
346
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
638,830,683✔
347
    if (TSDB_CODE_SUCCESS !=
187,654,683✔
348
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
187,640,898✔
349
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
350
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
351
      destroyRequest(*pRequest);
×
352
      *pRequest = NULL;
×
353
      return terrno;
×
354
    }
355
  }
356

357
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
638,838,395✔
358
  return TSDB_CODE_SUCCESS;
638,826,311✔
359
}
360

361
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
362
  int32_t code =
363
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
364
  if (TSDB_CODE_SUCCESS == code) {
×
365
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
366
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
367
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
368
    (*pNewRequest)->isSubReq = true;
×
369
  }
370
  return code;
×
371
}
372

373
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
682,485✔
374
  STscObj* pTscObj = pRequest->pTscObj;
682,485✔
375

376
  SParseContext cxt = {
682,981✔
377
      .requestId = pRequest->requestId,
682,698✔
378
      .requestRid = pRequest->self,
682,777✔
379
      .acctId = pTscObj->acctId,
682,777✔
380
      .db = pRequest->pDb,
683,133✔
381
      .topicQuery = topicQuery,
382
      .pSql = pRequest->sqlstr,
682,723✔
383
      .sqlLen = pRequest->sqlLen,
683,254✔
384
      .pMsg = pRequest->msgBuf,
683,000✔
385
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
386
      .pTransporter = pTscObj->pAppInfo->pTransporter,
682,963✔
387
      .pStmtCb = pStmtCb,
388
      .pUser = pTscObj->user,
682,879✔
389
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
682,485✔
390
      .enableSysInfo = pTscObj->sysInfo,
682,587✔
391
      .svrVer = pTscObj->sVer,
682,485✔
392
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
683,235✔
393
      .stmtBindVersion = pRequest->stmtBindVersion,
683,054✔
394
      .setQueryFp = setQueryRequest,
395
      .timezone = pTscObj->optionInfo.timezone,
682,709✔
396
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
682,519✔
397
  };
398

399
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
682,981✔
400
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
683,559✔
401
  if (code != TSDB_CODE_SUCCESS) {
683,355✔
402
    return code;
×
403
  }
404

405
  code = qParseSql(&cxt, pQuery);
683,355✔
406
  if (TSDB_CODE_SUCCESS == code) {
682,440✔
407
    if ((*pQuery)->haveResultSet) {
680,900✔
408
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
409
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
410
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
411
    }
412
  }
413

414
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
682,474✔
415
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
680,589✔
416
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
680,794✔
417
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
678,978✔
418
  }
419

420
  taosArrayDestroy(cxt.pTableMetaPos);
681,274✔
421
  taosArrayDestroy(cxt.pTableVgroupPos);
681,304✔
422

423
  return code;
681,927✔
424
}
425

426
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
427
  SRetrieveTableRsp* pRsp = NULL;
×
428
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
429
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
430
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
431
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
432
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
433
                                 pRequest->stmtBindVersion > 0);
×
434
  }
435

436
  return code;
×
437
}
438

439
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
341,761✔
440
  // drop table if exists not_exists_table
441
  if (NULL == pQuery->pCmdMsg) {
341,761✔
442
    return TSDB_CODE_SUCCESS;
×
443
  }
444

445
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
341,761✔
446
  pRequest->type = pMsgInfo->msgType;
341,761✔
447
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
341,761✔
448
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
341,761✔
449

450
  STscObj*      pTscObj = pRequest->pTscObj;
341,761✔
451
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
341,761✔
452

453
  // int64_t transporterId = 0;
454
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
341,761✔
455
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
341,761✔
456
  return TSDB_CODE_SUCCESS;
341,761✔
457
}
458

459
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
1,196,816,103✔
460

461
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
5,120,773✔
462
  SRetrieveTableRsp* pRsp = NULL;
5,120,773✔
463
  if (pRequest->validateOnly) {
5,120,773✔
464
    doRequestCallback(pRequest, 0);
12,258✔
465
    return;
12,258✔
466
  }
467

468
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
10,202,797✔
469
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
10,202,797✔
470
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
5,108,515✔
471
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
2,763,053✔
472
                                 pRequest->stmtBindVersion > 0);
2,763,053✔
473
  }
474

475
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
5,108,515✔
476
  pRequest->code = code;
5,108,515✔
477

478
  if (pRequest->code != TSDB_CODE_SUCCESS) {
5,108,515✔
479
    pResultInfo->numOfRows = 0;
3,815✔
480
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
3,815✔
481
             pRequest->requestId);
482
  } else {
483
    tscDebug(
5,104,700✔
484
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
485
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
486
  }
487

488
  doRequestCallback(pRequest, code);
5,108,515✔
489
}
490

491
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
16,704,189✔
492
  if (pRequest->validateOnly) {
16,704,189✔
493
    doRequestCallback(pRequest, 0);
×
494
    return TSDB_CODE_SUCCESS;
×
495
  }
496

497
  // drop table if exists not_exists_table
498
  if (NULL == pQuery->pCmdMsg) {
16,703,567✔
499
    doRequestCallback(pRequest, 0);
7,830✔
500
    return TSDB_CODE_SUCCESS;
7,830✔
501
  }
502

503
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
16,696,359✔
504
  pRequest->type = pMsgInfo->msgType;
16,695,482✔
505
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
16,696,137✔
506
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
16,695,515✔
507

508
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
16,695,882✔
509
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
16,695,928✔
510

511
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
16,695,896✔
512
  if (code) {
16,696,580✔
513
    doRequestCallback(pRequest, code);
×
514
  }
515
  return code;
16,696,580✔
516
}
517

518
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
354,838✔
519
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
354,838✔
520
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
354,838✔
521

522
  if (node1->load < node2->load) {
354,838✔
523
    return -1;
×
524
  }
525

526
  return node1->load > node2->load;
354,838✔
527
}
528

529
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
54,770✔
530
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
54,770✔
531
  if (pInfo->pQnodeList) {
54,770✔
532
    taosArrayDestroy(pInfo->pQnodeList);
52,199✔
533
    pInfo->pQnodeList = NULL;
52,199✔
534
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
52,199✔
535
  }
536

537
  if (pNodeList) {
54,770✔
538
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
54,770✔
539
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
54,770✔
540
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
54,770✔
541
             taosArrayGetSize(pInfo->pQnodeList));
542
  }
543
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
54,770✔
544

545
  return TSDB_CODE_SUCCESS;
54,770✔
546
}
547

548
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
638,892,316✔
549
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
638,892,316✔
550
    *required = false;
638,739,890✔
551
    return TSDB_CODE_SUCCESS;
638,738,330✔
552
  }
553

554
  int32_t       code = TSDB_CODE_SUCCESS;
152,426✔
555
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
152,426✔
556
  *required = false;
152,426✔
557

558
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
152,426✔
559
  *required = (NULL == pInfo->pQnodeList);
152,426✔
560
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
152,426✔
561
  return TSDB_CODE_SUCCESS;
152,426✔
562
}
563

564
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
565
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
566
  int32_t       code = 0;
×
567

568
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
569
  if (pInfo->pQnodeList) {
×
570
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
571
  }
572
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
573
  if (NULL == *pNodeList) {
×
574
    SCatalog* pCatalog = NULL;
×
575
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
576
    if (TSDB_CODE_SUCCESS == code) {
×
577
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
578
      if (NULL == pNodeList) {
×
579
        TSC_ERR_RET(terrno);
×
580
      }
581
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
582
                               .requestId = pRequest->requestId,
×
583
                               .requestObjRefId = pRequest->self,
×
584
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
585
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
586
    }
587

588
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
589
      code = updateQnodeList(pInfo, *pNodeList);
×
590
    }
591
  }
592

593
  return code;
×
594
}
595

596
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
6,332,742✔
597
  pRequest->type = pQuery->msgType;
6,332,742✔
598
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
6,332,503✔
599

600
  SPlanContext cxt = {.queryId = pRequest->requestId,
7,161,530✔
601
                      .acctId = pRequest->pTscObj->acctId,
6,331,919✔
602
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
6,332,890✔
603
                      .pAstRoot = pQuery->pRoot,
6,333,684✔
604
                      .showRewrite = pQuery->showRewrite,
6,333,362✔
605
                      .pMsg = pRequest->msgBuf,
6,333,148✔
606
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
607
                      .pUser = pRequest->pTscObj->user,
6,333,089✔
608
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
6,332,800✔
609
                      .sysInfo = pRequest->pTscObj->sysInfo};
6,332,444✔
610

611
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
6,332,101✔
612
}
613

614
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
113,449,656✔
615
                         const SExtSchema* pExtSchema, bool isStmt) {
616
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
113,449,656✔
617
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
618
    return TSDB_CODE_INVALID_PARA;
×
619
  }
620

621
  pResInfo->numOfCols = numOfCols;
113,450,308✔
622
  if (pResInfo->fields != NULL) {
113,449,944✔
623
    taosMemoryFree(pResInfo->fields);
18,755✔
624
  }
625
  if (pResInfo->userFields != NULL) {
113,448,533✔
626
    taosMemoryFree(pResInfo->userFields);
18,755✔
627
  }
628
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
113,450,511✔
629
  if (NULL == pResInfo->fields) return terrno;
113,447,477✔
630
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
113,448,420✔
631
  if (NULL == pResInfo->userFields) {
113,448,550✔
632
    taosMemoryFree(pResInfo->fields);
×
633
    return terrno;
×
634
  }
635
  if (numOfCols != pResInfo->numOfCols) {
113,449,369✔
636
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
637
    return TSDB_CODE_FAILED;
×
638
  }
639

640
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
688,832,582✔
641
    pResInfo->fields[i].type = pSchema[i].type;
575,381,086✔
642

643
    pResInfo->userFields[i].type = pSchema[i].type;
575,382,098✔
644
    // userFields must convert to type bytes, no matter isStmt or not
645
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
575,383,029✔
646
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
575,381,951✔
647
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
575,379,918✔
648
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,409,940✔
649
    }
650

651
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
575,379,980✔
652
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
575,382,429✔
653
  }
654
  return TSDB_CODE_SUCCESS;
113,451,342✔
655
}
656

657
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
82,144,074✔
658
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
82,144,074✔
659
      precision != TSDB_TIME_PRECISION_NANO) {
660
    return;
×
661
  }
662

663
  pResInfo->precision = precision;
82,144,074✔
664
}
665

666
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
86,233,933✔
667
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
86,233,933✔
668
  if (NULL == nodeList) {
86,238,541✔
669
    return terrno;
×
670
  }
671
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
86,238,835✔
672

673
  int32_t dbNum = taosArrayGetSize(pDbVgList);
86,238,835✔
674
  for (int32_t i = 0; i < dbNum; ++i) {
170,340,080✔
675
    SArray* pVg = taosArrayGetP(pDbVgList, i);
84,099,352✔
676
    if (NULL == pVg) {
84,100,358✔
677
      continue;
×
678
    }
679
    int32_t vgNum = taosArrayGetSize(pVg);
84,100,358✔
680
    if (vgNum <= 0) {
84,099,704✔
681
      continue;
683,853✔
682
    }
683

684
    for (int32_t j = 0; j < vgNum; ++j) {
272,465,425✔
685
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
189,048,724✔
686
      if (NULL == pInfo) {
189,049,266✔
687
        taosArrayDestroy(nodeList);
×
688
        return TSDB_CODE_OUT_OF_RANGE;
×
689
      }
690
      SQueryNodeLoad load = {0};
189,049,266✔
691
      load.addr.nodeId = pInfo->vgId;
189,049,869✔
692
      load.addr.epSet = pInfo->epSet;
189,048,578✔
693

694
      if (NULL == taosArrayPush(nodeList, &load)) {
189,042,822✔
695
        taosArrayDestroy(nodeList);
×
696
        return terrno;
×
697
      }
698
    }
699
  }
700

701
  int32_t vnodeNum = taosArrayGetSize(nodeList);
86,240,728✔
702
  if (vnodeNum > 0) {
86,241,232✔
703
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
83,137,559✔
704
    goto _return;
83,133,332✔
705
  }
706

707
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
3,103,673✔
708
  if (mnodeNum <= 0) {
3,103,249✔
709
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
710
    goto _return;
×
711
  }
712

713
  void* pData = taosArrayGet(pMnodeList, 0);
3,103,249✔
714
  if (NULL == pData) {
3,103,249✔
715
    taosArrayDestroy(nodeList);
×
716
    return TSDB_CODE_OUT_OF_RANGE;
×
717
  }
718
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
3,103,249✔
719
    taosArrayDestroy(nodeList);
×
720
    return terrno;
×
721
  }
722

723
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
3,103,249✔
724

725
_return:
43,186✔
726

727
  *pNodeList = nodeList;
86,236,279✔
728

729
  return TSDB_CODE_SUCCESS;
86,237,039✔
730
}
731

732
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
83,956✔
733
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
83,956✔
734
  if (NULL == nodeList) {
83,956✔
735
    return terrno;
×
736
  }
737

738
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
83,956✔
739
  if (qNodeNum > 0) {
83,956✔
740
    void* pData = taosArrayGet(pQnodeList, 0);
322✔
741
    if (NULL == pData) {
322✔
742
      taosArrayDestroy(nodeList);
×
743
      return TSDB_CODE_OUT_OF_RANGE;
×
744
    }
745
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
322✔
746
      taosArrayDestroy(nodeList);
×
747
      return terrno;
×
748
    }
749
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
322✔
750
    goto _return;
322✔
751
  }
752

753
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
83,634✔
754
  if (mnodeNum <= 0) {
83,634✔
755
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
56✔
756
    goto _return;
56✔
757
  }
758

759
  void* pData = taosArrayGet(pMnodeList, 0);
83,578✔
760
  if (NULL == pData) {
83,578✔
761
    taosArrayDestroy(nodeList);
×
762
    return TSDB_CODE_OUT_OF_RANGE;
×
763
  }
764
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
83,578✔
765
    taosArrayDestroy(nodeList);
×
766
    return terrno;
×
767
  }
768

769
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
83,578✔
770

771
_return:
×
772

773
  *pNodeList = nodeList;
83,956✔
774

775
  return TSDB_CODE_SUCCESS;
83,956✔
776
}
777

778
void freeVgList(void* list) {
6,287,734✔
779
  SArray* pList = *(SArray**)list;
6,287,734✔
780
  taosArrayDestroy(pList);
6,289,051✔
781
}
6,288,855✔
782

783
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
79,990,239✔
784
  SArray* pDbVgList = NULL;
79,990,239✔
785
  SArray* pQnodeList = NULL;
79,990,239✔
786
  FDelete fp = NULL;
79,990,239✔
787
  int32_t code = 0;
79,990,239✔
788

789
  switch (tsQueryPolicy) {
79,990,239✔
790
    case QUERY_POLICY_VNODE:
79,906,273✔
791
    case QUERY_POLICY_CLIENT: {
792
      if (pResultMeta) {
79,906,273✔
793
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
79,906,747✔
794
        if (NULL == pDbVgList) {
79,906,224✔
795
          code = terrno;
×
796
          goto _return;
×
797
        }
798
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
79,906,224✔
799
        for (int32_t i = 0; i < dbNum; ++i) {
157,716,084✔
800
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
77,809,592✔
801
          if (pRes->code || NULL == pRes->pRes) {
77,808,699✔
802
            continue;
1,070✔
803
          }
804

805
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
155,616,410✔
806
            code = terrno;
×
807
            goto _return;
×
808
          }
809
        }
810
      } else {
UNCOV
811
        fp = freeVgList;
×
812

UNCOV
813
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
814
        if (dbNum > 0) {
×
815
          SCatalog*     pCtg = NULL;
×
816
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
817
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
818
          if (code != TSDB_CODE_SUCCESS) {
×
819
            goto _return;
×
820
          }
821

822
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
823
          if (NULL == pDbVgList) {
×
824
            code = terrno;
×
825
            goto _return;
×
826
          }
827
          SArray* pVgList = NULL;
×
828
          for (int32_t i = 0; i < dbNum; ++i) {
×
829
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
830
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
831
                                     .requestId = pRequest->requestId,
×
832
                                     .requestObjRefId = pRequest->self,
×
833
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
834

835
            // catalogGetDBVgList will handle dbFName == null.
836
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
837
            if (code) {
×
838
              goto _return;
×
839
            }
840

841
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
842
              code = terrno;
×
843
              goto _return;
×
844
            }
845
          }
846
        }
847
      }
848

849
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
79,906,492✔
850
      break;
79,906,762✔
851
    }
852
    case QUERY_POLICY_HYBRID:
83,956✔
853
    case QUERY_POLICY_QNODE: {
854
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
167,702✔
855
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
83,746✔
856
        if (pRes->code) {
83,746✔
857
          pQnodeList = NULL;
×
858
        } else {
859
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
83,746✔
860
          if (NULL == pQnodeList) {
83,746✔
861
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
862
            goto _return;
×
863
          }
864
        }
865
      } else {
866
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
210✔
867
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
210✔
868
        if (pInst->pQnodeList) {
210✔
869
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
210✔
870
          if (NULL == pQnodeList) {
210✔
871
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
872
            goto _return;
×
873
          }
874
        }
875
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
210✔
876
      }
877

878
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
83,956✔
879
      break;
83,956✔
880
    }
881
    default:
30✔
882
      tscError("unknown query policy: %d", tsQueryPolicy);
30✔
883
      return TSDB_CODE_APP_ERROR;
×
884
  }
885

886
_return:
79,990,718✔
887
  taosArrayDestroyEx(pDbVgList, fp);
79,990,718✔
888
  taosArrayDestroy(pQnodeList);
79,990,275✔
889

890
  return code;
79,991,392✔
891
}
892

893
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
6,330,590✔
894
  SArray* pDbVgList = NULL;
6,330,590✔
895
  SArray* pQnodeList = NULL;
6,330,590✔
896
  int32_t code = 0;
6,330,997✔
897

898
  switch (tsQueryPolicy) {
6,330,997✔
899
    case QUERY_POLICY_VNODE:
6,327,339✔
900
    case QUERY_POLICY_CLIENT: {
901
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
6,327,339✔
902
      if (dbNum > 0) {
6,331,016✔
903
        SCatalog*     pCtg = NULL;
6,289,074✔
904
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
6,289,074✔
905
        code = catalogGetHandle(pInst->clusterId, &pCtg);
6,288,305✔
906
        if (code != TSDB_CODE_SUCCESS) {
6,288,736✔
907
          goto _return;
×
908
        }
909

910
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
6,288,736✔
911
        if (NULL == pDbVgList) {
6,289,422✔
912
          code = terrno;
355✔
913
          goto _return;
×
914
        }
915
        SArray* pVgList = NULL;
6,289,067✔
916
        for (int32_t i = 0; i < dbNum; ++i) {
12,576,435✔
917
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
6,287,407✔
918
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
6,289,512✔
919
                                   .requestId = pRequest->requestId,
6,288,832✔
920
                                   .requestObjRefId = pRequest->self,
6,288,559✔
921
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
6,288,790✔
922

923
          // catalogGetDBVgList will handle dbFName == null.
924
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
6,291,038✔
925
          if (code) {
6,287,642✔
926
            goto _return;
×
927
          }
928

929
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
6,290,091✔
930
            code = terrno;
×
931
            goto _return;
×
932
          }
933
        }
934
      }
935

936
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
6,331,433✔
937
      break;
6,330,320✔
938
    }
939
    case QUERY_POLICY_HYBRID:
×
940
    case QUERY_POLICY_QNODE: {
941
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
942

943
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
944
      break;
×
945
    }
946
    default:
3,658✔
947
      tscError("unknown query policy: %d", tsQueryPolicy);
3,658✔
948
      return TSDB_CODE_APP_ERROR;
×
949
  }
950

951
_return:
6,328,758✔
952

953
  taosArrayDestroyEx(pDbVgList, freeVgList);
6,329,744✔
954
  taosArrayDestroy(pQnodeList);
6,329,237✔
955

956
  return code;
6,330,652✔
957
}
958

959
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
6,333,333✔
960
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
6,333,333✔
961

962
  SExecResult      res = {0};
6,333,846✔
963
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
6,333,592✔
964
                           .requestId = pRequest->requestId,
6,333,846✔
965
                           .requestObjRefId = pRequest->self};
6,333,592✔
966
  SSchedulerReq    req = {
7,162,907✔
967
         .syncReq = true,
968
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
6,333,592✔
969
         .pConn = &conn,
970
         .pNodeList = pNodeList,
971
         .pDag = pDag,
972
         .sql = pRequest->sqlstr,
6,333,592✔
973
         .startTs = pRequest->metric.start,
6,333,846✔
974
         .execFp = NULL,
975
         .cbParam = NULL,
976
         .chkKillFp = chkRequestKilled,
977
         .chkKillParam = (void*)pRequest->self,
6,333,592✔
978
         .pExecRes = &res,
979
         .source = pRequest->source,
6,333,846✔
980
         .pWorkerCb = getTaskPoolWorkerCb(),
6,333,846✔
981
  };
982

983
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
6,333,846✔
984

985
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
6,333,271✔
986
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
6,333,321✔
987

988
  if (code != TSDB_CODE_SUCCESS) {
6,333,116✔
989
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
990

991
    pRequest->code = code;
×
992
    terrno = code;
×
993
    return pRequest->code;
×
994
  }
995

996
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
6,333,116✔
997
      TDMT_VND_CREATE_TABLE == pRequest->type) {
15,476✔
998
    pRequest->body.resInfo.numOfRows = res.numOfRows;
6,322,367✔
999
    if (TDMT_VND_SUBMIT == pRequest->type) {
6,322,230✔
1000
      STscObj*            pTscObj = pRequest->pTscObj;
6,317,321✔
1001
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
6,317,097✔
1002
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
6,317,694✔
1003
    }
1004

1005
    schedulerFreeJob(&pRequest->body.queryJob, 0);
6,322,555✔
1006
  }
1007

1008
  pRequest->code = res.code;
6,333,524✔
1009
  terrno = res.code;
6,332,928✔
1010
  return pRequest->code;
6,331,911✔
1011
}
1012

1013
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
455,385,991✔
1014
  SArray*      pArray = NULL;
455,385,991✔
1015
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
455,385,991✔
1016
  if (NULL == pRsp->aCreateTbRsp) {
455,385,991✔
1017
    return TSDB_CODE_SUCCESS;
446,659,340✔
1018
  }
1019

1020
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
8,733,049✔
1021
  for (int32_t i = 0; i < tbNum; ++i) {
21,056,994✔
1022
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
12,323,323✔
1023
    if (pTbRsp->pMeta) {
12,323,295✔
1024
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
11,722,810✔
1025
    }
1026
  }
1027

1028
  return TSDB_CODE_SUCCESS;
8,733,671✔
1029
}
1030

1031
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
67,634,986✔
1032
  int32_t code = 0;
67,634,986✔
1033
  SArray* pArray = NULL;
67,634,986✔
1034
  SArray* pTbArray = (SArray*)res;
67,634,986✔
1035
  int32_t tbNum = taosArrayGetSize(pTbArray);
67,634,986✔
1036
  if (tbNum <= 0) {
67,634,763✔
1037
    return TSDB_CODE_SUCCESS;
×
1038
  }
1039

1040
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
67,634,763✔
1041
  if (NULL == pArray) {
67,634,246✔
1042
    return terrno;
197✔
1043
  }
1044

1045
  for (int32_t i = 0; i < tbNum; ++i) {
179,263,675✔
1046
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
111,630,555✔
1047
    if (NULL == tbInfo) {
111,630,267✔
1048
      code = terrno;
×
1049
      goto _return;
×
1050
    }
1051
    STbSVersion tbSver = {
111,630,267✔
1052
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
111,630,496✔
1053
    if (NULL == taosArrayPush(pArray, &tbSver)) {
111,630,524✔
1054
      code = terrno;
×
1055
      goto _return;
×
1056
    }
1057
  }
1058

1059
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
67,633,120✔
1060
                           .requestId = pRequest->requestId,
67,634,272✔
1061
                           .requestObjRefId = pRequest->self,
67,634,542✔
1062
                           .mgmtEps = *epset};
1063

1064
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
67,634,308✔
1065

1066
_return:
67,635,183✔
1067

1068
  taosArrayDestroy(pArray);
67,634,902✔
1069
  return code;
67,634,023✔
1070
}
1071

1072
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
9,078,927✔
1073
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
9,078,927✔
1074
}
1075

1076
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
69,005,793✔
1077
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
69,005,793✔
1078
}
1079

1080
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
602,192,436✔
1081
  if (NULL == pRequest->body.resInfo.execRes.res) {
602,192,436✔
1082
    return pRequest->code;
23,931,696✔
1083
  }
1084

1085
  SCatalog*     pCatalog = NULL;
578,252,423✔
1086
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
578,262,893✔
1087

1088
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
578,266,077✔
1089
  if (code) {
578,258,499✔
1090
    return code;
×
1091
  }
1092

1093
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
578,258,499✔
1094
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
578,268,272✔
1095

1096
  switch (pRes->msgType) {
578,271,749✔
1097
    case TDMT_VND_ALTER_TABLE:
3,915,961✔
1098
    case TDMT_MND_ALTER_STB: {
1099
      code = handleAlterTbExecRes(pRes->res, pCatalog);
3,915,961✔
1100
      break;
3,915,961✔
1101
    }
1102
    case TDMT_VND_CREATE_TABLE: {
50,975,110✔
1103
      SArray* pList = (SArray*)pRes->res;
50,975,110✔
1104
      int32_t num = taosArrayGetSize(pList);
50,987,791✔
1105
      for (int32_t i = 0; i < num; ++i) {
106,380,755✔
1106
        void* res = taosArrayGetP(pList, i);
55,389,417✔
1107
        // handleCreateTbExecRes will handle res == null
1108
        code = handleCreateTbExecRes(res, pCatalog);
55,392,010✔
1109
      }
1110
      break;
50,991,338✔
1111
    }
1112
    case TDMT_MND_CREATE_STB: {
335,322✔
1113
      code = handleCreateTbExecRes(pRes->res, pCatalog);
335,322✔
1114
      break;
335,322✔
1115
    }
1116
    case TDMT_VND_SUBMIT: {
455,389,452✔
1117
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
455,389,452✔
1118

1119
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
455,395,986✔
1120
      break;
455,389,554✔
1121
    }
1122
    case TDMT_SCH_QUERY:
67,634,503✔
1123
    case TDMT_SCH_MERGE_QUERY: {
1124
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
67,634,503✔
1125
      break;
67,635,530✔
1126
    }
1127
    default:
638✔
1128
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
638✔
1129
               pRequest->type, pRequest->requestId);
1130
      code = TSDB_CODE_APP_ERROR;
×
1131
  }
1132

1133
  return code;
578,267,705✔
1134
}
1135

1136
static bool incompletaFileParsing(SNode* pStmt) {
594,604,626✔
1137
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
594,604,626✔
1138
}
1139

1140
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1141
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1142

1143
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1144
  if (TSDB_CODE_SUCCESS == code) {
×
1145
    int64_t analyseStart = taosGetTimestampUs();
×
1146
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1147
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1148
  }
1149

1150
  if (TSDB_CODE_SUCCESS == code) {
×
1151
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1152
  }
1153

1154
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1155
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1156
}
×
1157

1158
void returnToUser(SRequestObj* pRequest) {
12,435,863✔
1159
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
12,435,863✔
1160
    // return to client
1161
    doRequestCallback(pRequest, pRequest->code);
12,435,863✔
1162
    return;
12,435,863✔
1163
  }
1164

1165
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1166
  if (pUserReq) {
×
1167
    pUserReq->code = pRequest->code;
×
1168
    // return to client
1169
    doRequestCallback(pUserReq, pUserReq->code);
×
1170
    (void)releaseRequest(pRequest->relation.userRefId);
×
1171
    return;
×
1172
  } else {
1173
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1174
             pRequest->relation.userRefId, pRequest->requestId);
1175
  }
1176
}
1177

1178
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1179
  int64_t     lastTs = 0;
×
1180
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1181
  int32_t     numOfFields = taos_num_fields(pRes);
×
1182

1183
  int32_t code = createDataBlock(pBlock);
×
1184
  if (code) {
×
1185
    return code;
×
1186
  }
1187

1188
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1189
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1190
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1191
    if (TSDB_CODE_SUCCESS != code) {
×
1192
      blockDataDestroy(*pBlock);
×
1193
      return code;
×
1194
    }
1195
  }
1196

1197
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1198
  if (TSDB_CODE_SUCCESS != code) {
×
1199
    blockDataDestroy(*pBlock);
×
1200
    return code;
×
1201
  }
1202

1203
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1204
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1205
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1206
      tscError("invalid data from vnode");
×
1207
      blockDataDestroy(*pBlock);
×
1208
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1209
    }
1210
    int64_t ts = *(int64_t*)pRow[0];
×
1211
    if (lastTs < ts) {
×
1212
      lastTs = ts;
×
1213
    }
1214

1215
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1216
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1217
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1218
      if (TSDB_CODE_SUCCESS != code) {
×
1219
        blockDataDestroy(*pBlock);
×
1220
        return code;
×
1221
      }
1222
    }
1223

1224
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1225
            *(int64_t*)pRow[2]);
1226
  }
1227

1228
  (*pBlock)->info.window.ekey = lastTs;
×
1229
  (*pBlock)->info.rows = numOfRows;
×
1230

1231
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1232
  return TSDB_CODE_SUCCESS;
×
1233
}
1234

1235
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1236
  SRequestObj* pRequest = (SRequestObj*)res;
×
1237
  if (pRequest->code) {
×
1238
    returnToUser(pRequest);
×
1239
    return;
×
1240
  }
1241

1242
  SSDataBlock* pBlock = NULL;
×
1243
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1244
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1245
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1246
             tstrerror(pRequest->code));
1247
    returnToUser(pRequest);
×
1248
    return;
×
1249
  }
1250

1251
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1252
  if (pNextReq) {
×
1253
    continuePostSubQuery(pNextReq, pBlock);
×
1254
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1255
  } else {
1256
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1257
             pRequest->relation.nextRefId, pRequest->requestId);
1258
  }
1259

1260
  blockDataDestroy(pBlock);
×
1261
}
1262

1263
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1264
  SRequestObj* pRequest = pWrapper->pRequest;
×
1265
  if (TD_RES_QUERY(pRequest)) {
×
1266
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1267
    return;
×
1268
  }
1269

1270
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1271
  if (pNextReq) {
×
1272
    continuePostSubQuery(pNextReq, NULL);
×
1273
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1274
  } else {
1275
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1276
             pRequest->relation.nextRefId, pRequest->requestId);
1277
  }
1278
}
1279

1280
// todo refacto the error code  mgmt
1281
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
595,566,631✔
1282
  SSqlCallbackWrapper* pWrapper = param;
595,566,631✔
1283
  SRequestObj*         pRequest = pWrapper->pRequest;
595,566,631✔
1284
  STscObj*             pTscObj = pRequest->pTscObj;
595,568,577✔
1285

1286
  pRequest->code = code;
595,570,605✔
1287
  if (pResult) {
595,576,446✔
1288
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
595,532,889✔
1289
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
595,530,595✔
1290
  }
1291

1292
  int32_t type = pRequest->type;
595,559,616✔
1293
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
595,540,265✔
1294
    if (pResult) {
503,508,297✔
1295
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
503,497,717✔
1296

1297
      // record the insert rows
1298
      if (TDMT_VND_SUBMIT == type) {
503,513,312✔
1299
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
449,215,344✔
1300
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
449,220,490✔
1301
      }
1302
    }
1303
    schedulerFreeJob(&pRequest->body.queryJob, 0);
503,527,853✔
1304
  }
1305

1306
  taosMemoryFree(pResult);
595,573,579✔
1307
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
595,562,414✔
1308
           pRequest->requestId);
1309

1310
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
595,560,043✔
1311
      pRequest->stmtBindVersion == 0) {
43,483✔
1312
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
43,483✔
1313
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1314
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
43,483✔
1315
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1316
    }
1317
    restartAsyncQuery(pRequest, code);
43,483✔
1318
    return;
43,483✔
1319
  }
1320

1321
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
595,516,560✔
1322
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
595,516,560✔
1323
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
2,832,474✔
1324
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1325
    }
1326
  }
1327

1328
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
595,513,328✔
1329
  int32_t code1 = handleQueryExecRsp(pRequest);
595,523,480✔
1330
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
595,528,228✔
1331
    pRequest->code = code1;
×
1332
  }
1333

1334
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,190,139,781✔
1335
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
594,602,757✔
1336
    continueInsertFromCsv(pWrapper, pRequest);
11,695✔
1337
    return;
11,695✔
1338
  }
1339

1340
  if (pRequest->relation.nextRefId) {
595,526,361✔
1341
    handlePostSubQuery(pWrapper);
×
1342
  } else {
1343
    destorySqlCallbackWrapper(pWrapper);
595,522,574✔
1344
    pRequest->pWrapper = NULL;
595,509,657✔
1345

1346
    // return to client
1347
    doRequestCallback(pRequest, code);
595,518,144✔
1348
  }
1349
}
1350

1351
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
6,674,037✔
1352
  int32_t code = 0;
6,674,037✔
1353
  int32_t subplanNum = 0;
6,674,037✔
1354

1355
  if (pQuery->pRoot) {
6,674,037✔
1356
    pRequest->stmtType = pQuery->pRoot->type;
6,332,937✔
1357
  }
1358

1359
  if (pQuery->pRoot && !pRequest->inRetry) {
6,673,837✔
1360
    STscObj*            pTscObj = pRequest->pTscObj;
6,332,525✔
1361
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
6,332,151✔
1362
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
6,331,905✔
1363
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
6,322,747✔
1364
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
10,342✔
1365
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
10,370✔
1366
    }
1367
  }
1368

1369
  pRequest->body.execMode = pQuery->execMode;
6,673,959✔
1370
  switch (pQuery->execMode) {
6,674,935✔
1371
    case QUERY_EXEC_MODE_LOCAL:
×
1372
      if (!pRequest->validateOnly) {
×
1373
        if (NULL == pQuery->pRoot) {
×
1374
          terrno = TSDB_CODE_INVALID_PARA;
×
1375
          code = terrno;
×
1376
        } else {
1377
          code = execLocalCmd(pRequest, pQuery);
×
1378
        }
1379
      }
1380
      break;
×
1381
    case QUERY_EXEC_MODE_RPC:
341,761✔
1382
      if (!pRequest->validateOnly) {
341,761✔
1383
        code = execDdlQuery(pRequest, pQuery);
341,761✔
1384
      }
1385
      break;
341,761✔
1386
    case QUERY_EXEC_MODE_SCHEDULE: {
6,331,868✔
1387
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
6,331,868✔
1388
      if (NULL == pMnodeList) {
6,331,958✔
1389
        code = terrno;
×
1390
        break;
×
1391
      }
1392
      SQueryPlan* pDag = NULL;
6,331,958✔
1393
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
6,331,738✔
1394
      if (TSDB_CODE_SUCCESS == code) {
6,331,480✔
1395
        pRequest->body.subplanNum = pDag->numOfSubplans;
6,331,791✔
1396
        if (!pRequest->validateOnly) {
6,330,406✔
1397
          SArray* pNodeList = NULL;
6,330,800✔
1398
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
6,330,083✔
1399

1400
          if (TSDB_CODE_SUCCESS == code) {
6,330,369✔
1401
            SSessParam para = {.type = SESSION_MAX_CALL_VNODE_NUM, .value = taosArrayGetSize(pNodeList)};
6,330,530✔
1402
            code = tscUpdateSessMgtMetric(pRequest->pTscObj, &para);
6,332,077✔
1403
          }
1404

1405
          if (TSDB_CODE_SUCCESS == code) {
6,333,592✔
1406
            code = scheduleQuery(pRequest, pDag, pNodeList);
6,333,592✔
1407
          }
1408
          taosArrayDestroy(pNodeList);
6,331,800✔
1409
        }
1410
      }
1411
      taosArrayDestroy(pMnodeList);
6,329,895✔
1412
      break;
6,331,406✔
1413
    }
1414
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1415
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1416
      break;
×
1417
    default:
×
1418
      break;
×
1419
  }
1420

1421
  if (!keepQuery) {
6,673,423✔
1422
    qDestroyQuery(pQuery);
×
1423
  }
1424

1425
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
6,673,423✔
1426
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
6,576✔
1427
    if (TSDB_CODE_SUCCESS != ret) {
6,576✔
1428
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1429
               pRequest->requestId);
1430
    }
1431
  }
1432

1433
  if (TSDB_CODE_SUCCESS == code) {
6,673,413✔
1434
    code = handleQueryExecRsp(pRequest);
6,672,544✔
1435
  }
1436

1437
  if (TSDB_CODE_SUCCESS != code) {
6,672,232✔
1438
    pRequest->code = code;
4,380✔
1439
  }
1440

1441
  if (res) {
6,672,232✔
1442
    *res = pRequest->body.resInfo.execRes.res;
×
1443
    pRequest->body.resInfo.execRes.res = NULL;
×
1444
  }
1445
}
6,672,232✔
1446

1447
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
596,030,227✔
1448
                                 SSqlCallbackWrapper* pWrapper) {
1449
  int32_t code = TSDB_CODE_SUCCESS;
596,030,227✔
1450
  pRequest->type = pQuery->msgType;
596,030,227✔
1451
  SArray*     pMnodeList = NULL;
596,007,663✔
1452
  SQueryPlan* pDag = NULL;
596,007,663✔
1453
  int64_t     st = taosGetTimestampUs();
596,016,914✔
1454

1455
  if (!pRequest->parseOnly) {
596,016,914✔
1456
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
596,003,588✔
1457
    if (NULL == pMnodeList) {
595,996,359✔
1458
      code = terrno;
×
1459
    }
1460
    SPlanContext cxt = {.queryId = pRequest->requestId,
601,675,249✔
1461
                        .acctId = pRequest->pTscObj->acctId,
596,040,074✔
1462
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
596,052,472✔
1463
                        .pAstRoot = pQuery->pRoot,
596,063,001✔
1464
                        .showRewrite = pQuery->showRewrite,
596,065,693✔
1465
                        .isView = pWrapper->pParseCtx->isView,
596,055,701✔
1466
                        .isAudit = pWrapper->pParseCtx->isAudit,
596,044,494✔
1467
                        .pMsg = pRequest->msgBuf,
596,043,695✔
1468
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1469
                        .pUser = pRequest->pTscObj->user,
596,041,126✔
1470
                        .sysInfo = pRequest->pTscObj->sysInfo,
596,021,316✔
1471
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
596,013,342✔
1472
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
596,032,277✔
1473
    if (TSDB_CODE_SUCCESS == code) {
596,041,743✔
1474
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
596,043,272✔
1475
    }
1476
    if (code) {
596,021,496✔
1477
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
269,769✔
1478
               pRequest->requestId);
1479
    } else {
1480
      pRequest->body.subplanNum = pDag->numOfSubplans;
595,751,727✔
1481
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
595,760,573✔
1482
    }
1483
  }
1484

1485
  pRequest->metric.execStart = taosGetTimestampUs();
596,021,237✔
1486
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
596,013,933✔
1487

1488
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
598,845,270✔
1489
    SArray* pNodeList = NULL;
595,501,942✔
1490
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
595,464,605✔
1491
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
79,989,235✔
1492
    }
1493

1494
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
595,513,992✔
1495
                             .requestId = pRequest->requestId,
595,517,654✔
1496
                             .requestObjRefId = pRequest->self};
595,535,852✔
1497
    SSchedulerReq    req = {
598,363,961✔
1498
           .syncReq = false,
1499
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
595,511,697✔
1500
           .pConn = &conn,
1501
           .pNodeList = pNodeList,
1502
           .pDag = pDag,
1503
           .allocatorRefId = pRequest->allocatorRefId,
595,511,697✔
1504
           .sql = pRequest->sqlstr,
595,468,874✔
1505
           .startTs = pRequest->metric.start,
595,515,489✔
1506
           .execFp = schedulerExecCb,
1507
           .cbParam = pWrapper,
1508
           .chkKillFp = chkRequestKilled,
1509
           .chkKillParam = (void*)pRequest->self,
595,497,347✔
1510
           .pExecRes = NULL,
1511
           .source = pRequest->source,
595,509,073✔
1512
           .pWorkerCb = getTaskPoolWorkerCb(),
595,485,415✔
1513
    };
1514
    if (TSDB_CODE_SUCCESS == code) {
595,530,856✔
1515
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
595,564,583✔
1516
    }
1517

1518
    taosArrayDestroy(pNodeList);
595,534,748✔
1519
  } else {
1520
    qDestroyQueryPlan(pDag);
527,119✔
1521
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
493,289✔
1522
             pRequest->requestId);
1523
    destorySqlCallbackWrapper(pWrapper);
493,289✔
1524
    pRequest->pWrapper = NULL;
493,289✔
1525
    if (TSDB_CODE_SUCCESS != code) {
493,289✔
1526
      pRequest->code = terrno;
269,769✔
1527
    }
1528

1529
    doRequestCallback(pRequest, code);
493,289✔
1530
  }
1531

1532
  // todo not to be released here
1533
  taosArrayDestroy(pMnodeList);
596,065,361✔
1534

1535
  return code;
596,042,794✔
1536
}
1537

1538
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
618,501,374✔
1539
  int32_t code = 0;
618,501,374✔
1540

1541
  if (pRequest->parseOnly) {
618,501,374✔
1542
    doRequestCallback(pRequest, 0);
286,348✔
1543
    return;
286,348✔
1544
  }
1545

1546
  pRequest->body.execMode = pQuery->execMode;
618,237,700✔
1547
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
618,199,888✔
1548
    destorySqlCallbackWrapper(pWrapper);
22,184,922✔
1549
    pRequest->pWrapper = NULL;
22,184,954✔
1550
  }
1551

1552
  if (pQuery->pRoot && !pRequest->inRetry) {
618,170,250✔
1553
    STscObj*            pTscObj = pRequest->pTscObj;
618,194,683✔
1554
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
618,173,828✔
1555
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
618,223,288✔
1556
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
515,538,479✔
1557
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
449,175,152✔
1558
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
169,045,079✔
1559
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
75,408,850✔
1560
    }
1561
  }
1562

1563
  switch (pQuery->execMode) {
618,179,959✔
1564
    case QUERY_EXEC_MODE_LOCAL:
5,120,773✔
1565
      asyncExecLocalCmd(pRequest, pQuery);
5,120,773✔
1566
      break;
5,120,773✔
1567
    case QUERY_EXEC_MODE_RPC:
16,704,157✔
1568
      code = asyncExecDdlQuery(pRequest, pQuery);
16,704,157✔
1569
      break;
16,704,410✔
1570
    case QUERY_EXEC_MODE_SCHEDULE: {
596,045,176✔
1571
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
596,045,176✔
1572
      break;
596,062,552✔
1573
    }
1574
    case QUERY_EXEC_MODE_EMPTY_RESULT:
359,992✔
1575
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
359,992✔
1576
      doRequestCallback(pRequest, 0);
359,992✔
1577
      break;
359,992✔
1578
    default:
×
1579
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1580
      doRequestCallback(pRequest, -1);
×
1581
      break;
×
1582
  }
1583
}
1584

1585
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
11,306✔
1586
  SCatalog* pCatalog = NULL;
11,306✔
1587
  int32_t   code = 0;
11,306✔
1588
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
11,306✔
1589
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
11,306✔
1590

1591
  if (dbNum <= 0 && tblNum <= 0) {
11,306✔
1592
    return TSDB_CODE_APP_ERROR;
11,274✔
1593
  }
1594

1595
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
32✔
1596
  if (code != TSDB_CODE_SUCCESS) {
32✔
1597
    return code;
×
1598
  }
1599

1600
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
32✔
1601
                           .requestId = pRequest->requestId,
32✔
1602
                           .requestObjRefId = pRequest->self,
32✔
1603
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
32✔
1604

1605
  for (int32_t i = 0; i < dbNum; ++i) {
64✔
1606
    char* dbFName = taosArrayGet(pRequest->dbList, i);
32✔
1607

1608
    // catalogRefreshDBVgInfo will handle dbFName == null.
1609
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
32✔
1610
    if (code != TSDB_CODE_SUCCESS) {
32✔
1611
      return code;
×
1612
    }
1613
  }
1614

1615
  for (int32_t i = 0; i < tblNum; ++i) {
64✔
1616
    SName* tableName = taosArrayGet(pRequest->tableList, i);
32✔
1617

1618
    // catalogRefreshTableMeta will handle tableName == null.
1619
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
32✔
1620
    if (code != TSDB_CODE_SUCCESS) {
32✔
1621
      return code;
×
1622
    }
1623
  }
1624

1625
  return code;
32✔
1626
}
1627

1628
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
4,125,809✔
1629
  SCatalog* pCatalog = NULL;
4,125,809✔
1630
  int32_t   tbNum = taosArrayGetSize(tbList);
4,125,809✔
1631
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,125,809✔
1632
  if (code != TSDB_CODE_SUCCESS) {
4,125,809✔
1633
    return code;
×
1634
  }
1635

1636
  if (isView) {
4,125,809✔
1637
    for (int32_t i = 0; i < tbNum; ++i) {
819,392✔
1638
      SName* pViewName = taosArrayGet(tbList, i);
409,696✔
1639
      char   dbFName[TSDB_DB_FNAME_LEN];
407,156✔
1640
      if (NULL == pViewName) {
409,696✔
1641
        continue;
×
1642
      }
1643
      (void)tNameGetFullDbName(pViewName, dbFName);
409,696✔
1644
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
409,696✔
1645
    }
1646
  } else {
1647
    for (int32_t i = 0; i < tbNum; ++i) {
5,554,656✔
1648
      SName* pTbName = taosArrayGet(tbList, i);
1,838,543✔
1649
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
1,838,543✔
1650
    }
1651
  }
1652

1653
  return TSDB_CODE_SUCCESS;
4,125,809✔
1654
}
1655

1656
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
2,692,450✔
1657
  pEpSet->version = 0;
2,692,450✔
1658

1659
  // init mnode ip set
1660
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
2,692,511✔
1661
  mgmtEpSet->numOfEps = 0;
2,692,450✔
1662
  mgmtEpSet->inUse = 0;
2,692,264✔
1663

1664
  if (firstEp && firstEp[0] != 0) {
2,692,015✔
1665
    if (strlen(firstEp) >= TSDB_EP_LEN) {
2,691,407✔
1666
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1667
      return -1;
×
1668
    }
1669

1670
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,691,407✔
1671
    if (code != TSDB_CODE_SUCCESS) {
2,691,908✔
1672
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1673
      return terrno;
×
1674
    }
1675
    // uint32_t addr = 0;
1676
    SIpAddr addr = {0};
2,691,908✔
1677
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
2,691,959✔
1678
    if (code) {
2,691,313✔
1679
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
572✔
1680
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1681
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
556✔
1682
    } else {
1683
      mgmtEpSet->numOfEps++;
2,690,805✔
1684
    }
1685
  }
1686

1687
  if (secondEp && secondEp[0] != 0) {
2,691,801✔
1688
    if (strlen(secondEp) >= TSDB_EP_LEN) {
1,775,544✔
1689
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1690
      return terrno;
×
1691
    }
1692

1693
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
1,775,544✔
1694
    if (code != TSDB_CODE_SUCCESS) {
1,776,294✔
1695
      return code;
×
1696
    }
1697
    SIpAddr addr = {0};
1,776,294✔
1698
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
1,776,294✔
1699
    if (code) {
1,776,119✔
1700
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
56✔
1701
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1702
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1703
    } else {
1704
      mgmtEpSet->numOfEps++;
1,776,063✔
1705
    }
1706
  }
1707

1708
  if (mgmtEpSet->numOfEps == 0) {
2,691,839✔
1709
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
556✔
1710
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
556✔
1711
  }
1712

1713
  return 0;
2,691,576✔
1714
}
1715

1716
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
2,692,652✔
1717
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1718
  *pTscObj = NULL;
2,692,652✔
1719
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
2,692,652✔
1720
  if (TSDB_CODE_SUCCESS != code) {
2,692,652✔
1721
    return code;
×
1722
  }
1723

1724
  SRequestObj* pRequest = NULL;
2,692,652✔
1725
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
2,692,652✔
1726
  if (TSDB_CODE_SUCCESS != code) {
2,692,652✔
1727
    destroyTscObj(*pTscObj);
×
1728
    return code;
×
1729
  }
1730

1731
  pRequest->sqlstr = taosStrdup("taos_connect");
2,692,652✔
1732
  if (pRequest->sqlstr) {
2,692,564✔
1733
    pRequest->sqlLen = strlen(pRequest->sqlstr);
2,692,343✔
1734
  } else {
1735
    return terrno;
×
1736
  }
1737

1738
  SMsgSendInfo* body = NULL;
2,692,564✔
1739
  code = buildConnectMsg(pRequest, &body, totpCode);
2,692,564✔
1740
  if (TSDB_CODE_SUCCESS != code) {
2,691,493✔
1741
    destroyTscObj(*pTscObj);
×
1742
    return code;
×
1743
  }
1744

1745
  // int64_t transporterId = 0;
1746
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
2,691,493✔
1747
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
2,692,596✔
1748
  if (TSDB_CODE_SUCCESS != code) {
2,692,375✔
1749
    destroyTscObj(*pTscObj);
×
1750
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1751
    return code;
×
1752
  }
1753
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
2,692,375✔
1754
    destroyTscObj(*pTscObj);
×
1755
    tscError("failed to wait sem, code:%s", terrstr());
×
1756
    return terrno;
×
1757
  }
1758
  if (pRequest->code != TSDB_CODE_SUCCESS) {
2,692,652✔
1759
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
13,107✔
1760
    tscError("failed to connect to server, reason: %s", errorMsg);
13,107✔
1761

1762
    terrno = pRequest->code;
13,107✔
1763
    destroyRequest(pRequest);
13,107✔
1764
    taos_close_internal(*pTscObj);
13,107✔
1765
    *pTscObj = NULL;
13,107✔
1766
    return terrno;
13,107✔
1767
  }
1768
  if (connType == CONN_TYPE__AUTH_TEST) {
2,679,545✔
1769
    terrno = TSDB_CODE_SUCCESS;
×
1770
    destroyRequest(pRequest);
×
1771
    taos_close_internal(*pTscObj);
×
1772
    *pTscObj = NULL;
×
1773
    return TSDB_CODE_SUCCESS;
×
1774
  }
1775

1776
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
2,679,545✔
1777
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1778
  destroyRequest(pRequest);
2,679,545✔
1779
  return code;
2,679,545✔
1780
}
1781

1782
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
2,692,319✔
1783
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
2,692,319✔
1784
  if (*pMsgSendInfo == NULL) {
2,692,652✔
1785
    return terrno;
×
1786
  }
1787

1788
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
2,692,431✔
1789

1790
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
2,692,431✔
1791
  (*pMsgSendInfo)->requestId = pRequest->requestId;
2,692,431✔
1792
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
2,692,431✔
1793
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
2,692,500✔
1794
  if (NULL == (*pMsgSendInfo)->param) {
2,692,476✔
1795
    taosMemoryFree(*pMsgSendInfo);
×
1796
    return terrno;
×
1797
  }
1798

1799
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
2,692,255✔
1800

1801
  SConnectReq connectReq = {0};
2,691,863✔
1802
  STscObj*    pObj = pRequest->pTscObj;
2,691,863✔
1803

1804
  char* db = getDbOfConnection(pObj);
2,691,939✔
1805
  if (db != NULL) {
2,692,059✔
1806
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,177,330✔
1807
  } else if (terrno) {
1,514,729✔
1808
    taosMemoryFree(*pMsgSendInfo);
×
1809
    return terrno;
×
1810
  }
1811
  taosMemoryFreeClear(db);
2,692,115✔
1812

1813
  connectReq.connType = pObj->connType;
2,692,535✔
1814
  connectReq.pid = appInfo.pid;
2,692,511✔
1815
  connectReq.startTime = appInfo.startTime;
2,692,511✔
1816
  connectReq.totpCode = totpCode;
2,692,511✔
1817

1818
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
2,692,511✔
1819
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
2,692,195✔
1820
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
2,692,416✔
1821
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
2,692,511✔
1822

1823
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
2,692,511✔
1824
  void*   pReq = taosMemoryMalloc(contLen);
2,691,501✔
1825
  if (NULL == pReq) {
2,692,228✔
1826
    taosMemoryFree(*pMsgSendInfo);
×
1827
    return terrno;
×
1828
  }
1829

1830
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
2,692,228✔
1831
    taosMemoryFree(*pMsgSendInfo);
56✔
1832
    taosMemoryFree(pReq);
×
1833
    return terrno;
×
1834
  }
1835

1836
  (*pMsgSendInfo)->msgInfo.len = contLen;
2,691,980✔
1837
  (*pMsgSendInfo)->msgInfo.pData = pReq;
2,692,201✔
1838
  return TSDB_CODE_SUCCESS;
2,691,732✔
1839
}
1840

1841
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,000,611,540✔
1842
  if (NULL == pEpSet) {
1,000,611,540✔
1843
    return;
995,847,938✔
1844
  }
1845

1846
  switch (pSendInfo->target.type) {
4,763,602✔
1847
    case TARGET_TYPE_MNODE:
1,517✔
1848
      if (NULL == pTscObj) {
1,517✔
1849
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1850
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1851
        return;
×
1852
      }
1853

1854
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,517✔
1855
      SEpSet* pOrig = &originEpset;
1,517✔
1856
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
1,517✔
1857
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
1,517✔
1858
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
1,517✔
1859
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1860
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
1,517✔
1861
      break;
938,152✔
1862
    case TARGET_TYPE_VNODE: {
4,514,751✔
1863
      if (NULL == pTscObj) {
4,514,751✔
1864
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1865
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1866
        return;
×
1867
      }
1868

1869
      SCatalog* pCatalog = NULL;
4,514,751✔
1870
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,514,751✔
1871
      if (code != TSDB_CODE_SUCCESS) {
4,514,784✔
1872
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1873
                 tstrerror(code));
1874
        return;
×
1875
      }
1876

1877
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
4,514,784✔
1878
      if (code != TSDB_CODE_SUCCESS) {
4,515,093✔
1879
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1880
                 tstrerror(code));
1881
        return;
×
1882
      }
1883
      taosMemoryFreeClear(pSendInfo->target.dbFName);
4,515,093✔
1884
      break;
4,514,855✔
1885
    }
1886
    default:
248,920✔
1887
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
248,920✔
1888
      break;
248,881✔
1889
  }
1890
}
1891

1892
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
1,001,203,128✔
1893
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
1,001,203,128✔
1894
  if (pMsg->info.ahandle == NULL) {
1,001,203,263✔
1895
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
586,475✔
1896
    rpcFreeCont(pMsg->pCont);
586,475✔
1897
    taosMemoryFree(pEpSet);
586,475✔
1898
    return TSDB_CODE_TSC_INTERNAL_ERROR;
586,475✔
1899
  }
1900

1901
  STscObj* pTscObj = NULL;
1,000,614,995✔
1902

1903
  STraceId* trace = &pMsg->info.traceId;
1,000,614,995✔
1904
  char      tbuf[40] = {0};
1,000,617,355✔
1905
  TRACE_TO_STR(trace, tbuf);
1,000,616,846✔
1906

1907
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
1,000,619,265✔
1908
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1909

1910
  if (pSendInfo->requestObjRefId != 0) {
1,000,619,343✔
1911
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
869,867,256✔
1912
    if (pRequest) {
869,864,915✔
1913
      if (pRequest->self != pSendInfo->requestObjRefId) {
869,672,643✔
1914
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1915
                 pSendInfo->requestObjRefId);
1916

1917
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1918
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1919
        }
1920
        rpcFreeCont(pMsg->pCont);
×
1921
        taosMemoryFree(pEpSet);
×
1922
        destroySendMsgInfo(pSendInfo);
×
1923
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1924
      }
1925
      pTscObj = pRequest->pTscObj;
869,673,441✔
1926
    }
1927
  }
1928

1929
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
1,000,617,960✔
1930

1931
  SDataBuf buf = {.msgType = pMsg->msgType,
1,000,611,952✔
1932
                  .len = pMsg->contLen,
1,000,613,910✔
1933
                  .pData = NULL,
1934
                  .handle = pMsg->info.handle,
1,000,613,528✔
1935
                  .handleRefId = pMsg->info.refId,
1,000,613,830✔
1936
                  .pEpSet = pEpSet};
1937

1938
  if (pMsg->contLen > 0) {
1,000,612,138✔
1939
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
980,964,625✔
1940
    if (buf.pData == NULL) {
980,962,030✔
1941
      pMsg->code = terrno;
×
1942
    } else {
1943
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
980,962,030✔
1944
    }
1945
  }
1946

1947
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
1,000,618,208✔
1948

1949
  if (pTscObj) {
1,000,597,755✔
1950
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
869,657,052✔
1951
    if (TSDB_CODE_SUCCESS != code) {
869,674,063✔
UNCOV
1952
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
UNCOV
1953
      terrno = code;
×
UNCOV
1954
      pMsg->code = code;
×
1955
    }
1956
  }
1957

1958
  rpcFreeCont(pMsg->pCont);
1,000,614,766✔
1959
  destroySendMsgInfo(pSendInfo);
1,000,597,996✔
1960
  return TSDB_CODE_SUCCESS;
1,000,577,879✔
1961
}
1962

1963
int32_t doProcessMsgFromServer(void* param) {
1,001,203,040✔
1964
  AsyncArg* arg = (AsyncArg*)param;
1,001,203,040✔
1965
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
1,001,203,040✔
1966
  taosMemoryFree(arg);
1,001,160,421✔
1967
  return code;
1,001,160,071✔
1968
}
1969

1970
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,001,194,581✔
1971
  int32_t code = 0;
1,001,194,581✔
1972
  SEpSet* tEpSet = NULL;
1,001,194,581✔
1973

1974
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
1,001,194,581✔
1975

1976
  if (pEpSet != NULL) {
1,001,196,204✔
1977
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
4,764,380✔
1978
    if (NULL == tEpSet) {
4,764,878✔
1979
      code = terrno;
×
1980
      pMsg->code = terrno;
×
1981
      goto _exit;
×
1982
    }
1983
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
4,764,878✔
1984
  }
1985

1986
  // pMsg is response msg
1987
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
1,001,196,702✔
1988
    // restore origin code
1989
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
2,692,403✔
1990
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1991
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
2,692,618✔
1992
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1993
    }
1994
  } else {
1995
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1996
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
998,505,335✔
1997
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1998
    }
1999
  }
2000

2001
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
1,001,196,971✔
2002
  if (NULL == arg) {
1,001,186,240✔
2003
    code = terrno;
×
2004
    pMsg->code = code;
×
2005
    goto _exit;
×
2006
  }
2007

2008
  arg->msg = *pMsg;
1,001,186,240✔
2009
  arg->pEpset = tEpSet;
1,001,191,054✔
2010

2011
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
1,001,193,683✔
2012
    pMsg->code = code;
55✔
2013
    taosMemoryFree(arg);
55✔
2014
    goto _exit;
×
2015
  }
2016
  return;
1,001,200,956✔
2017

2018
_exit:
×
2019
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
2020
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
2021
  if (code != 0) {
×
2022
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2023
  }
2024
}
2025

2026
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
34✔
2027
                        uint16_t port) {
2028
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
34✔
2029
  if (user == NULL) {
34✔
2030
    user = TSDB_DEFAULT_USER;
×
2031
  }
2032

2033
  if (pass == NULL) {
34✔
2034
    pass = TSDB_DEFAULT_PASS;
×
2035
  }
2036

2037
  STscObj* pObj = NULL;
34✔
2038
  int32_t  code = taos_connect_internal(ip, user, pass, NULL, totp, db, port, CONN_TYPE__QUERY, &pObj);
34✔
2039
  if (TSDB_CODE_SUCCESS == code) {
34✔
2040
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
34✔
2041
    if (NULL == rid) {
34✔
2042
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2043
      return NULL;
×
2044
    }
2045
    *rid = pObj->id;
34✔
2046
    return (TAOS*)rid;
34✔
2047
  } else {
2048
    terrno = code;
×
2049
  }
2050

2051
  return NULL;
×
2052
}
2053

2054
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
×
2055
                      uint16_t port) {
2056
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
2057
  if (user == NULL) {
×
2058
    user = TSDB_DEFAULT_USER;
×
2059
  }
2060

2061
  if (pass == NULL) {
×
2062
    pass = TSDB_DEFAULT_PASS;
×
2063
  }
2064

2065
  STscObj* pObj = NULL;
×
2066
  return taos_connect_internal(ip, user, pass, NULL, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
×
2067
}
2068

2069
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) { return NULL; }
×
2070

2071
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
320✔
2072
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
320✔
2073
  if (user == NULL) {
320✔
2074
    user = TSDB_DEFAULT_USER;
×
2075
  }
2076

2077
  if (auth == NULL) {
320✔
2078
    tscError("No auth info is given, failed to connect to server");
×
2079
    return NULL;
×
2080
  }
2081

2082
  STscObj* pObj = NULL;
320✔
2083
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
320✔
2084
  if (TSDB_CODE_SUCCESS == code) {
320✔
2085
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
134✔
2086
    if (NULL == rid) {
134✔
2087
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2088
    }
2089
    *rid = pObj->id;
134✔
2090
    return (TAOS*)rid;
134✔
2091
  }
2092

2093
  return NULL;
186✔
2094
}
2095

2096
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
2097
//                      const char* db, int dbLen, uint16_t port) {
2098
//   char ipStr[TSDB_EP_LEN] = {0};
2099
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
2100
//   char userStr[TSDB_USER_LEN] = {0};
2101
//   char passStr[TSDB_PASSWORD_LEN] = {0};
2102
//
2103
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
2104
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
2105
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
2106
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
2107
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
2108
// }
2109

2110
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,035,207,313✔
2111
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2112
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2113

2114
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2115
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2116

2117
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2118
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2119
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
1,921,754,187✔
2120

2121
        if (IS_STR_DATA_BLOB(type)) {
1,921,888,858✔
2122
          pResultInfo->length[i] = blobDataLen(pStart);
24,380✔
2123
          pResultInfo->row[i] = blobDataVal(pStart);
×
2124
        } else {
2125
          pResultInfo->length[i] = varDataLen(pStart);
1,921,864,482✔
2126
          pResultInfo->row[i] = varDataVal(pStart);
1,921,633,325✔
2127
        }
2128
      } else {
2129
        pResultInfo->row[i] = NULL;
24,362,439✔
2130
        pResultInfo->length[i] = 0;
24,440,053✔
2131
      }
2132
    } else {
2133
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2134
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2135
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2136
      } else {
2137
        pResultInfo->row[i] = NULL;
151,593,405✔
2138
        pResultInfo->length[i] = 0;
153,510,464✔
2139
      }
2140
    }
2141
  }
2142
}
2,035,823,162✔
2143

2144
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2145
  if (pRequest == NULL) {
×
2146
    return NULL;
×
2147
  }
2148

2149
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2150
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2151
    // All data has returned to App already, no need to try again
2152
    if (pResultInfo->completed) {
×
2153
      pResultInfo->numOfRows = 0;
×
2154
      return NULL;
×
2155
    }
2156

2157
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2158
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2159

2160
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2161
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2162
      pResultInfo->numOfRows = 0;
×
2163
      return NULL;
×
2164
    }
2165

2166
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2167
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2168
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2169
      pResultInfo->numOfRows = 0;
×
2170
      return NULL;
×
2171
    }
2172

2173
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2174
             ", complete:%d, QID:0x%" PRIx64,
2175
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2176

2177
    STscObj*            pTscObj = pRequest->pTscObj;
×
2178
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2179
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2180

2181
    if (pResultInfo->numOfRows == 0) {
×
2182
      return NULL;
×
2183
    }
2184
  }
2185

2186
  if (setupOneRowPtr) {
×
2187
    doSetOneRowPtr(pResultInfo);
×
2188
    pResultInfo->current += 1;
×
2189
  }
2190

2191
  return pResultInfo->row;
×
2192
}
2193

2194
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
88,915,896✔
2195
  tsem_t* sem = param;
88,915,896✔
2196
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
88,915,896✔
2197
    tscError("failed to post sem, code:%s", terrstr());
×
2198
  }
2199
}
88,916,119✔
2200

2201
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,051,672,307✔
2202
  if (pRequest == NULL) {
1,051,672,307✔
2203
    return NULL;
×
2204
  }
2205

2206
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,051,672,307✔
2207
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,051,785,857✔
2208
    // All data has returned to App already, no need to try again
2209
    if (pResultInfo->completed) {
158,167,555✔
2210
      pResultInfo->numOfRows = 0;
69,272,297✔
2211
      return NULL;
69,271,851✔
2212
    }
2213

2214
    // convert ucs4 to native multi-bytes string
2215
    pResultInfo->convertUcs4 = convertUcs4;
88,915,445✔
2216
    tsem_t sem;
87,998,043✔
2217
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
88,915,729✔
2218
      tscError("failed to init sem, code:%s", terrstr());
×
2219
    }
2220
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
88,915,669✔
2221
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
88,915,922✔
2222
      tscError("failed to wait sem, code:%s", terrstr());
×
2223
    }
2224
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
88,915,922✔
2225
      tscError("failed to destroy sem, code:%s", terrstr());
×
2226
    }
2227
    pRequest->inCallback = false;
88,916,119✔
2228
  }
2229

2230
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
982,588,294✔
2231
    return NULL;
6,687,122✔
2232
  } else {
2233
    if (setupOneRowPtr) {
975,860,446✔
2234
      doSetOneRowPtr(pResultInfo);
895,159,377✔
2235
      pResultInfo->current += 1;
895,199,902✔
2236
    }
2237

2238
    return pResultInfo->row;
975,905,250✔
2239
  }
2240
}
2241

2242
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
117,713,327✔
2243
  if (pResInfo->row == NULL) {
117,713,327✔
2244
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
103,000,460✔
2245
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
103,000,494✔
2246
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
102,999,276✔
2247
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
102,999,792✔
2248

2249
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
102,999,494✔
2250
      taosMemoryFree(pResInfo->row);
142✔
2251
      taosMemoryFree(pResInfo->pCol);
×
2252
      taosMemoryFree(pResInfo->length);
×
2253
      taosMemoryFree(pResInfo->convertBuf);
×
2254
      return terrno;
×
2255
    }
2256
  }
2257

2258
  return TSDB_CODE_SUCCESS;
117,715,890✔
2259
}
2260

2261
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
117,484,652✔
2262
  int32_t idx = -1;
117,484,652✔
2263
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
117,484,652✔
2264
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
117,482,444✔
2265

2266
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
682,290,133✔
2267
    int32_t type = pResultInfo->fields[i].type;
564,812,992✔
2268
    int32_t schemaBytes =
2269
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
564,812,987✔
2270

2271
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
564,809,332✔
2272
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
18,900,054✔
2273
      if (p == NULL) {
18,900,054✔
2274
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2275
        return terrno;
×
2276
      }
2277

2278
      pResultInfo->convertBuf[i] = p;
18,900,054✔
2279

2280
      SResultColumn* pCol = &pResultInfo->pCol[i];
18,900,054✔
2281
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2282
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2283
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2284

2285
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2286
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2287
            tscError(
77✔
2288
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2289
                "colLength[i]):%p",
2290
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2291
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
77✔
2292
            return TSDB_CODE_TSC_INTERNAL_ERROR;
77✔
2293
          }
2294

2295
          varDataSetLen(p, len);
2,147,483,647✔
2296
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2297
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2298
        }
2299
      }
2300

2301
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
18,899,977✔
2302
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
18,899,977✔
2303
    }
2304
  }
2305
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
117,483,012✔
2306
  return TSDB_CODE_SUCCESS;
117,484,668✔
2307
}
2308

2309
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
117,484,132✔
2310
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
682,287,728✔
2311
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
564,808,122✔
2312
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
564,805,845✔
2313
    int32_t       type = pFieldE->type;
564,809,727✔
2314
    int32_t       bufLen = 0;
564,806,319✔
2315
    char*         p = NULL;
564,806,319✔
2316
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
564,806,319✔
2317
      continue;
563,209,668✔
2318
    } else {
2319
      bufLen = 64;
1,596,383✔
2320
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,596,383✔
2321
      pFieldE->bytes = bufLen;
1,596,383✔
2322
      pField->bytes = bufLen;
1,596,383✔
2323
    }
2324
    if (!p) return terrno;
1,596,383✔
2325
    pResultInfo->convertBuf[i] = p;
1,596,383✔
2326

2327
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,000,691,005✔
2328
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
999,094,622✔
2329
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
999,094,622✔
2330
      p += bufLen;
999,094,622✔
2331
      if (TSDB_CODE_SUCCESS != code) {
999,094,622✔
2332
        return code;
×
2333
      }
2334
    }
2335
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,596,383✔
2336
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,596,383✔
2337
  }
2338
  return 0;
117,483,878✔
2339
}
2340

2341
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
389,040✔
2342
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
778,080✔
2343
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
389,040✔
2344
}
2345

2346
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
194,520✔
2347
  char*   p = (char*)pResultInfo->pData;
194,520✔
2348
  int32_t blockVersion = *(int32_t*)p;
194,520✔
2349

2350
  int32_t numOfRows = pResultInfo->numOfRows;
194,520✔
2351
  int32_t numOfCols = pResultInfo->numOfCols;
194,520✔
2352

2353
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2354
  // length |
2355
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
194,520✔
2356
  if (numOfCols != cols) {
194,520✔
2357
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2358
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2359
  }
2360

2361
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
194,520✔
2362
  int32_t* colLength = (int32_t*)(p + len);
194,520✔
2363
  len += sizeof(int32_t) * numOfCols;
194,520✔
2364

2365
  char* pStart = p + len;
194,520✔
2366
  for (int32_t i = 0; i < numOfCols; ++i) {
845,816✔
2367
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
651,296✔
2368

2369
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
651,296✔
2370
      int32_t* offset = (int32_t*)pStart;
230,480✔
2371
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
230,480✔
2372
      len += lenTmp;
230,480✔
2373
      pStart += lenTmp;
230,480✔
2374

2375
      int32_t estimateColLen = 0;
230,480✔
2376
      for (int32_t j = 0; j < numOfRows; ++j) {
1,205,368✔
2377
        if (offset[j] == -1) {
974,888✔
2378
          continue;
49,024✔
2379
        }
2380
        char* data = offset[j] + pStart;
925,864✔
2381

2382
        int32_t jsonInnerType = *data;
925,864✔
2383
        char*   jsonInnerData = data + CHAR_BYTES;
925,864✔
2384
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
925,864✔
2385
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
13,056✔
2386
        } else if (tTagIsJson(data)) {
912,808✔
2387
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
214,776✔
2388
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
698,032✔
2389
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
649,072✔
2390
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,960✔
2391
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
35,904✔
2392
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
13,056✔
2393
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
13,056✔
2394
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
2395
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2396
        } else {
2397
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2398
          return -1;
×
2399
        }
2400
      }
2401
      len += TMAX(colLen, estimateColLen);
230,480✔
2402
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
420,816✔
2403
      int32_t lenTmp = numOfRows * sizeof(int32_t);
54,400✔
2404
      len += (lenTmp + colLen);
54,400✔
2405
      pStart += lenTmp;
54,400✔
2406
    } else {
2407
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
366,416✔
2408
      len += (lenTmp + colLen);
366,416✔
2409
      pStart += lenTmp;
366,416✔
2410
    }
2411
    pStart += colLen;
651,296✔
2412
  }
2413

2414
  // Ensure the complete structure of the block, including the blankfill field,
2415
  // even though it is not used on the client side.
2416
  len += sizeof(bool);
194,520✔
2417
  return len;
194,520✔
2418
}
2419

2420
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
117,714,709✔
2421
  int32_t numOfRows = pResultInfo->numOfRows;
117,714,709✔
2422
  int32_t numOfCols = pResultInfo->numOfCols;
117,714,895✔
2423
  bool    needConvert = false;
117,715,399✔
2424
  for (int32_t i = 0; i < numOfCols; ++i) {
683,654,349✔
2425
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
566,132,294✔
2426
      needConvert = true;
194,520✔
2427
      break;
194,520✔
2428
    }
2429
  }
2430

2431
  if (!needConvert) {
117,716,575✔
2432
    return TSDB_CODE_SUCCESS;
117,522,055✔
2433
  }
2434

2435
  tscDebug("start to convert form json format string");
194,520✔
2436

2437
  char*   p = (char*)pResultInfo->pData;
194,520✔
2438
  int32_t blockVersion = *(int32_t*)p;
194,520✔
2439
  int32_t dataLen = estimateJsonLen(pResultInfo);
194,520✔
2440
  if (dataLen <= 0) {
194,520✔
2441
    tscError("doConvertJson error: estimateJsonLen failed");
×
2442
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2443
  }
2444

2445
  taosMemoryFreeClear(pResultInfo->convertJson);
194,520✔
2446
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
194,520✔
2447
  if (pResultInfo->convertJson == NULL) return terrno;
194,520✔
2448
  char* p1 = pResultInfo->convertJson;
194,520✔
2449

2450
  int32_t totalLen = 0;
194,520✔
2451
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
194,520✔
2452
  if (numOfCols != cols) {
194,520✔
2453
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2454
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2455
  }
2456

2457
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
194,520✔
2458
  (void)memcpy(p1, p, len);
194,520✔
2459

2460
  p += len;
194,520✔
2461
  p1 += len;
194,520✔
2462
  totalLen += len;
194,520✔
2463

2464
  len = sizeof(int32_t) * numOfCols;
194,520✔
2465
  int32_t* colLength = (int32_t*)p;
194,520✔
2466
  int32_t* colLength1 = (int32_t*)p1;
194,520✔
2467
  (void)memcpy(p1, p, len);
194,520✔
2468
  p += len;
194,520✔
2469
  p1 += len;
194,520✔
2470
  totalLen += len;
194,520✔
2471

2472
  char* pStart = p;
194,520✔
2473
  char* pStart1 = p1;
194,520✔
2474
  for (int32_t i = 0; i < numOfCols; ++i) {
845,816✔
2475
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
651,296✔
2476
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
651,296✔
2477
    if (colLen >= dataLen) {
651,296✔
2478
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2479
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2480
    }
2481
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
651,296✔
2482
      int32_t* offset = (int32_t*)pStart;
230,480✔
2483
      int32_t* offset1 = (int32_t*)pStart1;
230,480✔
2484
      len = numOfRows * sizeof(int32_t);
230,480✔
2485
      (void)memcpy(pStart1, pStart, len);
230,480✔
2486
      pStart += len;
230,480✔
2487
      pStart1 += len;
230,480✔
2488
      totalLen += len;
230,480✔
2489

2490
      len = 0;
230,480✔
2491
      for (int32_t j = 0; j < numOfRows; ++j) {
1,205,368✔
2492
        if (offset[j] == -1) {
974,888✔
2493
          continue;
49,024✔
2494
        }
2495
        char* data = offset[j] + pStart;
925,864✔
2496

2497
        int32_t jsonInnerType = *data;
925,864✔
2498
        char*   jsonInnerData = data + CHAR_BYTES;
925,864✔
2499
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
925,864✔
2500
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
925,864✔
2501
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
13,056✔
2502
          varDataSetLen(dst, strlen(varDataVal(dst)));
13,056✔
2503
        } else if (tTagIsJson(data)) {
912,808✔
2504
          char* jsonString = NULL;
214,776✔
2505
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
214,776✔
2506
          if (jsonString == NULL) {
214,776✔
2507
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2508
            return terrno;
×
2509
          }
2510
          STR_TO_VARSTR(dst, jsonString);
214,776✔
2511
          taosMemoryFree(jsonString);
214,776✔
2512
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
698,032✔
2513
          *(char*)varDataVal(dst) = '\"';
649,072✔
2514
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
649,072✔
2515
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
649,072✔
2516
                                         pResultInfo->charsetCxt);
2517
          if (length <= 0) {
649,072✔
2518
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
544✔
2519
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2520
            length = 0;
544✔
2521
          }
2522
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
649,072✔
2523
                                                 varDataVal(tmp), length);
2524
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
649,072✔
2525
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
649,072✔
2526
          tscError("value:%s.", varDataVal(dst));
649,072✔
2527
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,960✔
2528
          double jsonVd = *(double*)(jsonInnerData);
35,904✔
2529
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
35,904✔
2530
          varDataSetLen(dst, strlen(varDataVal(dst)));
35,904✔
2531
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
13,056✔
2532
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
13,056✔
2533
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
13,056✔
2534
          varDataSetLen(dst, strlen(varDataVal(dst)));
13,056✔
2535
        } else {
2536
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2537
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2538
        }
2539

2540
        offset1[j] = len;
925,864✔
2541
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
925,864✔
2542
        len += varDataTLen(dst);
925,864✔
2543
      }
2544
      colLen1 = len;
230,480✔
2545
      totalLen += colLen1;
230,480✔
2546
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
230,480✔
2547
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
420,816✔
2548
      len = numOfRows * sizeof(int32_t);
54,400✔
2549
      (void)memcpy(pStart1, pStart, len);
54,400✔
2550
      pStart += len;
54,400✔
2551
      pStart1 += len;
54,400✔
2552
      totalLen += len;
54,400✔
2553
      totalLen += colLen;
54,400✔
2554
      (void)memcpy(pStart1, pStart, colLen);
54,400✔
2555
    } else {
2556
      len = BitmapLen(pResultInfo->numOfRows);
366,416✔
2557
      (void)memcpy(pStart1, pStart, len);
366,416✔
2558
      pStart += len;
366,416✔
2559
      pStart1 += len;
366,416✔
2560
      totalLen += len;
366,416✔
2561
      totalLen += colLen;
366,416✔
2562
      (void)memcpy(pStart1, pStart, colLen);
366,416✔
2563
    }
2564
    pStart += colLen;
651,296✔
2565
    pStart1 += colLen1;
651,296✔
2566
  }
2567

2568
  // Ensure the complete structure of the block, including the blankfill field,
2569
  // even though it is not used on the client side.
2570
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2571
  totalLen += sizeof(bool);
194,520✔
2572

2573
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
194,520✔
2574
  pResultInfo->pData = pResultInfo->convertJson;
194,520✔
2575
  return TSDB_CODE_SUCCESS;
194,520✔
2576
}
2577

2578
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
124,428,062✔
2579
  bool convertForDecimal = convertUcs4;
124,428,062✔
2580
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
124,428,062✔
2581
    tscError("setResultDataPtr paras error");
1,635✔
2582
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2583
  }
2584

2585
  if (pResultInfo->numOfRows == 0) {
124,426,981✔
2586
    return TSDB_CODE_SUCCESS;
6,712,400✔
2587
  }
2588

2589
  if (pResultInfo->pData == NULL) {
117,715,658✔
2590
    tscError("setResultDataPtr error: pData is NULL");
×
2591
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2592
  }
2593

2594
  int32_t code = doPrepareResPtr(pResultInfo);
117,715,597✔
2595
  if (code != TSDB_CODE_SUCCESS) {
117,716,169✔
2596
    return code;
×
2597
  }
2598
  code = doConvertJson(pResultInfo);
117,716,169✔
2599
  if (code != TSDB_CODE_SUCCESS) {
117,715,403✔
2600
    return code;
×
2601
  }
2602

2603
  char* p = (char*)pResultInfo->pData;
117,715,403✔
2604

2605
  // version:
2606
  int32_t blockVersion = *(int32_t*)p;
117,715,589✔
2607
  p += sizeof(int32_t);
117,715,868✔
2608

2609
  int32_t dataLen = *(int32_t*)p;
117,715,671✔
2610
  p += sizeof(int32_t);
117,716,333✔
2611

2612
  int32_t rows = *(int32_t*)p;
117,714,887✔
2613
  p += sizeof(int32_t);
117,715,073✔
2614

2615
  int32_t cols = *(int32_t*)p;
117,715,377✔
2616
  p += sizeof(int32_t);
117,715,041✔
2617

2618
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
117,715,336✔
2619
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
1,720✔
2620
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2621
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2622
  }
2623

2624
  int32_t hasColumnSeg = *(int32_t*)p;
117,714,393✔
2625
  p += sizeof(int32_t);
117,712,910✔
2626

2627
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
117,715,400✔
2628
  p += sizeof(uint64_t);
117,715,400✔
2629

2630
  // check fields
2631
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
683,889,819✔
2632
    int8_t type = *(int8_t*)p;
566,179,575✔
2633
    p += sizeof(int8_t);
566,176,961✔
2634

2635
    int32_t bytes = *(int32_t*)p;
566,176,425✔
2636
    p += sizeof(int32_t);
566,178,509✔
2637

2638
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
566,175,825✔
2639
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
315,272✔
2640
    }
2641
  }
2642

2643
  int32_t* colLength = (int32_t*)p;
117,715,573✔
2644
  p += sizeof(int32_t) * pResultInfo->numOfCols;
117,715,573✔
2645

2646
  char* pStart = p;
117,715,956✔
2647
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
683,899,790✔
2648
    if ((pStart - pResultInfo->pData) >= dataLen) {
566,182,313✔
2649
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2650
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2651
    }
2652
    if (blockVersion == BLOCK_VERSION_1) {
566,180,645✔
2653
      colLength[i] = htonl(colLength[i]);
425,806,564✔
2654
    }
2655
    if (colLength[i] >= dataLen) {
566,180,529✔
2656
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2657
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2658
    }
2659
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
566,181,811✔
2660
      tscError("invalid type %d", pResultInfo->fields[i].type);
35✔
2661
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2662
    }
2663
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
566,184,982✔
2664
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
142,544,079✔
2665
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
142,543,718✔
2666
    } else {
2667
      pResultInfo->pCol[i].nullbitmap = pStart;
423,643,505✔
2668
      pStart += BitmapLen(pResultInfo->numOfRows);
423,644,587✔
2669
    }
2670

2671
    pResultInfo->pCol[i].pData = pStart;
566,188,142✔
2672
    pResultInfo->length[i] =
1,132,365,440✔
2673
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,125,684,207✔
2674
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
566,180,881✔
2675

2676
    pStart += colLength[i];
566,181,634✔
2677
  }
2678

2679
  p = pStart;
117,716,649✔
2680
  // bool blankFill = *(bool*)p;
2681
  p += sizeof(bool);
117,716,649✔
2682
  int32_t offset = p - pResultInfo->pData;
117,716,742✔
2683
  if (offset > dataLen) {
117,716,003✔
2684
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2685
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2686
  }
2687

2688
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2689
  if (convertUcs4) {
117,716,003✔
2690
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
117,484,522✔
2691
  }
2692
#endif
2693
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
117,716,226✔
2694
    code = convertDecimalType(pResultInfo);
117,484,668✔
2695
  }
2696
  return code;
117,716,008✔
2697
}
2698

2699
char* getDbOfConnection(STscObj* pObj) {
644,212,431✔
2700
  terrno = TSDB_CODE_SUCCESS;
644,212,431✔
2701
  char* p = NULL;
644,217,285✔
2702
  (void)taosThreadMutexLock(&pObj->mutex);
644,217,285✔
2703
  size_t len = strlen(pObj->db);
644,218,097✔
2704
  if (len > 0) {
644,218,555✔
2705
    p = taosStrndup(pObj->db, tListLen(pObj->db));
578,198,997✔
2706
    if (p == NULL) {
578,193,736✔
2707
      tscError("failed to taosStrndup db name");
×
2708
    }
2709
  }
2710

2711
  (void)taosThreadMutexUnlock(&pObj->mutex);
644,213,294✔
2712
  return p;
644,215,513✔
2713
}
2714

2715
void setConnectionDB(STscObj* pTscObj, const char* db) {
2,473,001✔
2716
  if (db == NULL || pTscObj == NULL) {
2,473,001✔
2717
    tscError("setConnectionDB para is NULL");
×
2718
    return;
×
2719
  }
2720

2721
  (void)taosThreadMutexLock(&pTscObj->mutex);
2,473,033✔
2722
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
2,472,757✔
2723
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
2,472,824✔
2724
}
2725

2726
void resetConnectDB(STscObj* pTscObj) {
×
2727
  if (pTscObj == NULL) {
×
2728
    return;
×
2729
  }
2730

2731
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2732
  pTscObj->db[0] = 0;
×
2733
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2734
}
2735

2736
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
93,122,705✔
2737
                              bool isStmt) {
2738
  if (pResultInfo == NULL || pRsp == NULL) {
93,122,705✔
2739
    tscError("setQueryResultFromRsp paras is null");
×
2740
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2741
  }
2742

2743
  taosMemoryFreeClear(pResultInfo->pRspMsg);
93,122,731✔
2744
  pResultInfo->pRspMsg = (const char*)pRsp;
93,122,508✔
2745
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
93,122,954✔
2746
  pResultInfo->current = 0;
93,122,954✔
2747
  pResultInfo->completed = (pRsp->completed == 1);
93,122,731✔
2748
  pResultInfo->precision = pRsp->precision;
93,122,954✔
2749

2750
  // decompress data if needed
2751
  int32_t payloadLen = htonl(pRsp->payloadLen);
93,122,482✔
2752

2753
  if (pRsp->compressed) {
93,122,287✔
2754
    if (pResultInfo->decompBuf == NULL) {
×
2755
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
2756
      if (pResultInfo->decompBuf == NULL) {
×
2757
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2758
        return terrno;
×
2759
      }
2760
      pResultInfo->decompBufSize = payloadLen;
×
2761
    } else {
2762
      if (pResultInfo->decompBufSize < payloadLen) {
×
2763
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2764
        if (p == NULL) {
×
2765
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2766
          return terrno;
×
2767
        }
2768

2769
        pResultInfo->decompBuf = p;
×
2770
        pResultInfo->decompBufSize = payloadLen;
×
2771
      }
2772
    }
2773
  }
2774

2775
  if (payloadLen > 0) {
93,122,705✔
2776
    int32_t compLen = *(int32_t*)pRsp->data;
86,410,946✔
2777
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
86,410,946✔
2778

2779
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
86,410,723✔
2780

2781
    if (pRsp->compressed && compLen < rawLen) {
86,410,751✔
2782
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
2783
      if (len < 0) {
×
2784
        tscError("tsDecompressString failed");
×
2785
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2786
      }
2787
      if (len != rawLen) {
×
2788
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2789
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2790
      }
2791
      pResultInfo->pData = pResultInfo->decompBuf;
×
2792
      pResultInfo->payloadLen = rawLen;
×
2793
    } else {
2794
      pResultInfo->pData = pStart;
86,410,946✔
2795
      pResultInfo->payloadLen = htonl(pRsp->compLen);
86,410,723✔
2796
      if (pRsp->compLen != pRsp->payloadLen) {
86,410,751✔
2797
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2798
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2799
      }
2800
    }
2801
  }
2802

2803
  // TODO handle the compressed case
2804
  pResultInfo->totalRows += pResultInfo->numOfRows;
93,122,705✔
2805

2806
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
93,122,954✔
2807
  return code;
93,122,536✔
2808
}
2809

2810
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
469✔
2811
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
469✔
2812
  void*              clientRpc = NULL;
469✔
2813
  SServerStatusRsp   statusRsp = {0};
469✔
2814
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
469✔
2815
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
469✔
2816
  SRpcMsg  rpcRsp = {0};
469✔
2817
  SRpcInit rpcInit = {0};
469✔
2818
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
469✔
2819

2820
  rpcInit.label = "CHK";
469✔
2821
  rpcInit.numOfThreads = 1;
469✔
2822
  rpcInit.cfp = NULL;
469✔
2823
  rpcInit.sessions = 16;
469✔
2824
  rpcInit.connType = TAOS_CONN_CLIENT;
469✔
2825
  rpcInit.idleTime = tsShellActivityTimer * 1000;
469✔
2826
  rpcInit.compressSize = tsCompressMsgSize;
469✔
2827
  rpcInit.user = "_dnd";
469✔
2828

2829
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
469✔
2830
  connLimitNum = TMAX(connLimitNum, 10);
469✔
2831
  connLimitNum = TMIN(connLimitNum, 500);
469✔
2832
  rpcInit.connLimitNum = connLimitNum;
469✔
2833
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
469✔
2834
  rpcInit.readTimeout = tsReadTimeout;
469✔
2835
  rpcInit.ipv6 = tsEnableIpv6;
469✔
2836
  rpcInit.enableSSL = tsEnableTLS;
469✔
2837

2838
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
469✔
2839
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
469✔
2840
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
469✔
2841
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
469✔
2842
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
469✔
2843

2844
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
469✔
2845
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2846
    goto _OVER;
×
2847
  }
2848

2849
  clientRpc = rpcOpen(&rpcInit);
469✔
2850
  if (clientRpc == NULL) {
469✔
2851
    code = terrno;
×
2852
    tscError("failed to init server status client since %s", tstrerror(code));
×
2853
    goto _OVER;
×
2854
  }
2855

2856
  if (fqdn == NULL) {
469✔
2857
    fqdn = tsLocalFqdn;
469✔
2858
  }
2859

2860
  if (port == 0) {
469✔
2861
    port = tsServerPort;
469✔
2862
  }
2863

2864
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
469✔
2865
  epSet.eps[0].port = (uint16_t)port;
469✔
2866
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
469✔
2867
  if (TSDB_CODE_SUCCESS != ret) {
469✔
2868
    tscError("failed to send recv since %s", tstrerror(ret));
×
2869
    goto _OVER;
×
2870
  }
2871

2872
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
469✔
2873
    tscError("failed to send server status req since %s", terrstr());
134✔
2874
    goto _OVER;
134✔
2875
  }
2876

2877
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
335✔
2878
    tscError("failed to parse server status rsp since %s", terrstr());
×
2879
    goto _OVER;
×
2880
  }
2881

2882
  code = statusRsp.statusCode;
335✔
2883
  if (details != NULL) {
335✔
2884
    tstrncpy(details, statusRsp.details, maxlen);
335✔
2885
  }
2886

2887
_OVER:
401✔
2888
  if (clientRpc != NULL) {
469✔
2889
    rpcClose(clientRpc);
469✔
2890
  }
2891
  if (rpcRsp.pCont != NULL) {
469✔
2892
    rpcFreeCont(rpcRsp.pCont);
335✔
2893
  }
2894
  return code;
469✔
2895
}
2896

2897
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,252✔
2898
                      int32_t acctId, char* db) {
2899
  SName name = {0};
1,252✔
2900

2901
  if (len1 <= 0) {
1,252✔
2902
    return -1;
×
2903
  }
2904

2905
  const char* dbName = db;
1,252✔
2906
  const char* tbName = NULL;
1,252✔
2907
  int32_t     dbLen = 0;
1,252✔
2908
  int32_t     tbLen = 0;
1,252✔
2909
  if (len2 > 0) {
1,252✔
2910
    dbName = str + pos1;
×
2911
    dbLen = len1;
×
2912
    tbName = str + pos2;
×
2913
    tbLen = len2;
×
2914
  } else {
2915
    dbLen = strlen(db);
1,252✔
2916
    tbName = str + pos1;
1,252✔
2917
    tbLen = len1;
1,252✔
2918
  }
2919

2920
  if (dbLen <= 0 || tbLen <= 0) {
1,252✔
2921
    return -1;
×
2922
  }
2923

2924
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,252✔
2925
    return -1;
×
2926
  }
2927

2928
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,252✔
2929
    return -1;
×
2930
  }
2931

2932
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,252✔
2933
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,252✔
2934

2935
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,252✔
2936
  if (pDb) {
1,252✔
2937
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2938
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2939
    }
2940
  } else {
2941
    STablesReq db;
1,252✔
2942
    db.pTables = taosArrayInit(20, sizeof(SName));
1,252✔
2943
    if (NULL == db.pTables) {
1,252✔
2944
      return terrno;
×
2945
    }
2946
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,252✔
2947
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,504✔
2948
      return terrno;
×
2949
    }
2950
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,252✔
2951
  }
2952

2953
  return TSDB_CODE_SUCCESS;
1,252✔
2954
}
2955

2956
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,252✔
2957
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,252✔
2958
  if (NULL == pHash) {
1,252✔
2959
    return terrno;
×
2960
  }
2961

2962
  bool    inEscape = false;
1,252✔
2963
  int32_t code = 0;
1,252✔
2964
  void*   pIter = NULL;
1,252✔
2965

2966
  int32_t vIdx = 0;
1,252✔
2967
  int32_t vPos[2];
1,252✔
2968
  int32_t vLen[2];
1,252✔
2969

2970
  (void)memset(vPos, -1, sizeof(vPos));
1,252✔
2971
  (void)memset(vLen, 0, sizeof(vLen));
1,252✔
2972

2973
  for (int32_t i = 0;; ++i) {
6,260✔
2974
    if (0 == *(tbList + i)) {
6,260✔
2975
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,252✔
2976
        vLen[vIdx] = i - vPos[vIdx];
1,252✔
2977
      }
2978

2979
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,252✔
2980
      if (code) {
1,252✔
2981
        goto _return;
×
2982
      }
2983

2984
      break;
1,252✔
2985
    }
2986

2987
    if ('`' == *(tbList + i)) {
5,008✔
2988
      inEscape = !inEscape;
×
2989
      if (!inEscape) {
×
2990
        if (vPos[vIdx] >= 0) {
×
2991
          vLen[vIdx] = i - vPos[vIdx];
×
2992
        } else {
2993
          goto _return;
×
2994
        }
2995
      }
2996

2997
      continue;
×
2998
    }
2999

3000
    if (inEscape) {
5,008✔
3001
      if (vPos[vIdx] < 0) {
×
3002
        vPos[vIdx] = i;
×
3003
      }
3004
      continue;
×
3005
    }
3006

3007
    if ('.' == *(tbList + i)) {
5,008✔
3008
      if (vPos[vIdx] < 0) {
×
3009
        goto _return;
×
3010
      }
3011
      if (vLen[vIdx] <= 0) {
×
3012
        vLen[vIdx] = i - vPos[vIdx];
×
3013
      }
3014
      vIdx++;
×
3015
      if (vIdx >= 2) {
×
3016
        goto _return;
×
3017
      }
3018
      continue;
×
3019
    }
3020

3021
    if (',' == *(tbList + i)) {
5,008✔
3022
      if (vPos[vIdx] < 0) {
×
3023
        goto _return;
×
3024
      }
3025
      if (vLen[vIdx] <= 0) {
×
3026
        vLen[vIdx] = i - vPos[vIdx];
×
3027
      }
3028

3029
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3030
      if (code) {
×
3031
        goto _return;
×
3032
      }
3033

3034
      (void)memset(vPos, -1, sizeof(vPos));
×
3035
      (void)memset(vLen, 0, sizeof(vLen));
×
3036
      vIdx = 0;
×
3037
      continue;
×
3038
    }
3039

3040
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
5,008✔
3041
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3042
        vLen[vIdx] = i - vPos[vIdx];
×
3043
      }
3044
      continue;
×
3045
    }
3046

3047
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
5,008✔
3048
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
626✔
3049
      if (vLen[vIdx] > 0) {
5,008✔
3050
        goto _return;
×
3051
      }
3052
      if (vPos[vIdx] < 0) {
5,008✔
3053
        vPos[vIdx] = i;
1,252✔
3054
      }
3055
      continue;
5,008✔
3056
    }
3057

3058
    goto _return;
×
3059
  }
3060

3061
  int32_t dbNum = taosHashGetSize(pHash);
1,252✔
3062
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,252✔
3063
  if (NULL == pReq) {
1,252✔
3064
    TSC_ERR_JRET(terrno);
×
3065
  }
3066
  pIter = taosHashIterate(pHash, NULL);
1,252✔
3067
  while (pIter) {
2,504✔
3068
    STablesReq* pDb = (STablesReq*)pIter;
1,252✔
3069
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,504✔
3070
      TSC_ERR_JRET(terrno);
×
3071
    }
3072
    pIter = taosHashIterate(pHash, pIter);
1,252✔
3073
  }
3074

3075
  taosHashCleanup(pHash);
1,252✔
3076

3077
  return TSDB_CODE_SUCCESS;
1,252✔
3078

3079
_return:
×
3080

3081
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3082

3083
  pIter = taosHashIterate(pHash, NULL);
×
3084
  while (pIter) {
×
3085
    STablesReq* pDb = (STablesReq*)pIter;
×
3086
    taosArrayDestroy(pDb->pTables);
×
3087
    pIter = taosHashIterate(pHash, pIter);
×
3088
  }
3089

3090
  taosHashCleanup(pHash);
×
3091

3092
  return terrno;
×
3093
}
3094

3095
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,252✔
3096
  SSyncQueryParam* pParam = param;
1,252✔
3097
  pParam->pRequest->code = code;
1,252✔
3098

3099
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,252✔
3100
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3101
  }
3102
}
1,252✔
3103

3104
void syncQueryFn(void* param, void* res, int32_t code) {
637,105,834✔
3105
  SSyncQueryParam* pParam = param;
637,105,834✔
3106
  pParam->pRequest = res;
637,105,834✔
3107

3108
  if (pParam->pRequest) {
637,107,951✔
3109
    pParam->pRequest->code = code;
637,091,328✔
3110
    clientOperateReport(pParam->pRequest);
637,103,356✔
3111
  }
3112

3113
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
637,065,009✔
3114
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3115
  }
3116
}
637,119,883✔
3117

3118
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
636,596,045✔
3119
                        int8_t source) {
3120
  if (sql == NULL || NULL == fp) {
636,596,045✔
3121
    terrno = TSDB_CODE_INVALID_PARA;
690✔
3122
    if (fp) {
×
3123
      fp(param, NULL, terrno);
×
3124
    }
3125

3126
    return;
×
3127
  }
3128

3129
  size_t sqlLen = strlen(sql);
636,595,600✔
3130
  if (sqlLen > (size_t)tsMaxSQLLength) {
636,595,600✔
3131
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
1,272✔
3132
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1,272✔
3133
    fp(param, NULL, terrno);
1,272✔
3134
    return;
1,272✔
3135
  }
3136

3137
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
636,594,328✔
3138

3139
  SRequestObj* pRequest = NULL;
636,594,670✔
3140
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
636,591,842✔
3141
  if (code != TSDB_CODE_SUCCESS) {
636,593,759✔
3142
    terrno = code;
×
3143
    fp(param, NULL, terrno);
×
3144
    return;
×
3145
  }
3146

3147
  SSessParam para = {.type = SESSION_MAX_CONCURRENCY, .value = 1};
636,593,759✔
3148
  code = connUpdateSessMgtMetric(connId, &para);
636,594,501✔
3149
  if (code != TSDB_CODE_SUCCESS) {
636,603,860✔
3150
    terrno = code;
×
3151
    fp(param, NULL, terrno);
×
3152
    return;
×
3153
  }
3154

3155
  pRequest->source = source;
636,603,860✔
3156
  pRequest->body.queryFp = fp;
636,603,860✔
3157
  doAsyncQuery(pRequest, false);
636,603,860✔
3158
}
3159

3160
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
813✔
3161
                                 int64_t reqid) {
3162
  if (sql == NULL || NULL == fp) {
813✔
3163
    terrno = TSDB_CODE_INVALID_PARA;
×
3164
    if (fp) {
×
3165
      fp(param, NULL, terrno);
×
3166
    }
3167

3168
    return;
×
3169
  }
3170

3171
  size_t sqlLen = strlen(sql);
813✔
3172
  if (sqlLen > (size_t)tsMaxSQLLength) {
813✔
3173
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
3174
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3175
    fp(param, NULL, terrno);
×
3176
    return;
×
3177
  }
3178

3179
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
813✔
3180

3181
  SRequestObj* pRequest = NULL;
813✔
3182
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
813✔
3183
  if (code != TSDB_CODE_SUCCESS) {
813✔
3184
    terrno = code;
×
3185
    fp(param, NULL, terrno);
×
3186
    return;
×
3187
  }
3188

3189
  SSessParam para = {.type = SESSION_MAX_CONCURRENCY, .value = 1};
813✔
3190
  code = connUpdateSessMgtMetric(connId, &para);
813✔
3191
  if (code != TSDB_CODE_SUCCESS) {
813✔
3192
    terrno = code;
×
3193
    fp(param, NULL, terrno);
×
3194
    return;
×
3195
  }
3196

3197
  pRequest->body.queryFp = fp;
813✔
3198

3199
  doAsyncQuery(pRequest, false);
813✔
3200
}
3201

3202
int32_t connUpdateSessMgtMetric(int64_t connId, SSessParam* pParam) {
636,594,047✔
3203
  int32_t code = 0;
636,594,047✔
3204

3205
  STscObj* pTscObj = acquireTscObj(connId);
636,594,047✔
3206
  if (pTscObj == NULL) {
636,603,364✔
3207
    code = TSDB_CODE_INVALID_PARA;
×
3208
    return code;
×
3209
  }
3210
  code = sessMgtUpdateUserMetric(pTscObj->user, pParam);
636,603,364✔
3211

3212
  releaseTscObj(connId);
636,604,673✔
3213
  return code;
636,604,673✔
3214
}
3215

3216
int32_t tscUpdateSessMgtMetric(STscObj* pTscObj, SSessParam* pParam) {
647,719,615✔
3217
  int32_t code = 0;
647,719,615✔
3218

3219
  if (pTscObj == NULL) {
647,719,615✔
3220
    code = TSDB_CODE_INVALID_PARA;
×
3221
    return code;
×
3222
  }
3223
  code = sessMgtUpdateUserMetric(pTscObj->user, pParam);
647,719,615✔
3224

3225
  return code;
647,736,685✔
3226
}
3227

3228
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
636,558,048✔
3229
  if (NULL == taos) {
636,558,048✔
3230
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3231
    return NULL;
×
3232
  }
3233

3234
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
636,558,048✔
3235
  if (NULL == param) {
636,559,125✔
3236
    return NULL;
×
3237
  }
3238

3239
  int32_t code = tsem_init(&param->sem, 0, 0);
636,559,125✔
3240
  if (TSDB_CODE_SUCCESS != code) {
636,551,045✔
3241
    taosMemoryFree(param);
×
3242
    return NULL;
×
3243
  }
3244

3245
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
636,551,045✔
3246
  code = tsem_wait(&param->sem);
636,554,837✔
3247
  if (TSDB_CODE_SUCCESS != code) {
636,563,964✔
3248
    taosMemoryFree(param);
×
3249
    return NULL;
×
3250
  }
3251
  code = tsem_destroy(&param->sem);
636,563,964✔
3252
  if (TSDB_CODE_SUCCESS != code) {
636,564,175✔
3253
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3254
  }
3255

3256
  SRequestObj* pRequest = NULL;
636,564,205✔
3257
  if (param->pRequest != NULL) {
636,564,205✔
3258
    param->pRequest->syncQuery = true;
636,562,946✔
3259
    pRequest = param->pRequest;
636,563,104✔
3260
    param->pRequest->inCallback = false;
636,564,055✔
3261
  }
3262
  taosMemoryFree(param);
636,561,542✔
3263

3264
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3265
  //          *(int64_t*)taos, pRequest);
3266

3267
  return pRequest;
636,560,675✔
3268
}
3269

3270
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
813✔
3271
  if (NULL == taos) {
813✔
3272
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3273
    return NULL;
×
3274
  }
3275

3276
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
813✔
3277
  if (param == NULL) {
813✔
3278
    return NULL;
×
3279
  }
3280
  int32_t code = tsem_init(&param->sem, 0, 0);
813✔
3281
  if (TSDB_CODE_SUCCESS != code) {
813✔
3282
    taosMemoryFree(param);
×
3283
    return NULL;
×
3284
  }
3285

3286
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
813✔
3287
  code = tsem_wait(&param->sem);
813✔
3288
  if (TSDB_CODE_SUCCESS != code) {
813✔
3289
    taosMemoryFree(param);
×
3290
    return NULL;
×
3291
  }
3292
  SRequestObj* pRequest = NULL;
813✔
3293
  if (param->pRequest != NULL) {
813✔
3294
    param->pRequest->syncQuery = true;
813✔
3295
    pRequest = param->pRequest;
813✔
3296
  }
3297
  taosMemoryFree(param);
813✔
3298

3299
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3300
  //   *(int64_t*)taos, pRequest);
3301

3302
  return pRequest;
813✔
3303
}
3304

3305
static void fetchCallback(void* pResult, void* param, int32_t code) {
90,268,930✔
3306
  SRequestObj* pRequest = (SRequestObj*)param;
90,268,930✔
3307

3308
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
90,268,930✔
3309

3310
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
90,268,930✔
3311
           tstrerror(code), pRequest->requestId);
3312

3313
  pResultInfo->pData = pResult;
90,268,536✔
3314
  pResultInfo->numOfRows = 0;
90,268,930✔
3315

3316
  if (code != TSDB_CODE_SUCCESS) {
90,268,289✔
3317
    pRequest->code = code;
×
3318
    taosMemoryFreeClear(pResultInfo->pData);
×
3319
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3320
    return;
×
3321
  }
3322

3323
  if (pRequest->code != TSDB_CODE_SUCCESS) {
90,268,289✔
3324
    taosMemoryFreeClear(pResultInfo->pData);
×
3325
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3326
    return;
×
3327
  }
3328

3329
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
91,189,269✔
3330
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
90,268,707✔
3331
  if (pRequest->code != TSDB_CODE_SUCCESS) {
90,268,512✔
3332
    pResultInfo->numOfRows = 0;
77✔
3333
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
77✔
3334
             tstrerror(pRequest->code), pRequest->requestId);
3335
  } else {
3336
    tscDebug(
90,267,357✔
3337
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3338
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3339

3340
    STscObj*            pTscObj = pRequest->pTscObj;
90,267,357✔
3341
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
90,268,407✔
3342
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
90,268,435✔
3343
  }
3344

3345
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
90,268,930✔
3346
}
3347

3348
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
93,136,383✔
3349
  pRequest->body.fetchFp = fp;
93,136,383✔
3350
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
93,136,383✔
3351

3352
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
93,136,612✔
3353

3354
  // this query has no results or error exists, return directly
3355
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
93,136,835✔
UNCOV
3356
    pResultInfo->numOfRows = 0;
×
3357
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3358
    return;
1,737✔
3359
  }
3360

3361
  // all data has returned to App already, no need to try again
3362
  if (pResultInfo->completed) {
93,136,606✔
3363
    // it is a local executed query, no need to do async fetch
3364
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
2,867,905✔
3365
      if (pResultInfo->localResultFetched) {
1,540,046✔
3366
        pResultInfo->numOfRows = 0;
770,023✔
3367
        pResultInfo->current = 0;
770,023✔
3368
      } else {
3369
        pResultInfo->localResultFetched = true;
770,023✔
3370
      }
3371
    } else {
3372
      pResultInfo->numOfRows = 0;
1,327,859✔
3373
    }
3374

3375
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
2,867,905✔
3376
    return;
2,867,905✔
3377
  }
3378

3379
  SSchedulerReq req = {
90,268,701✔
3380
      .syncReq = false,
3381
      .fetchFp = fetchCallback,
3382
      .cbParam = pRequest,
3383
  };
3384

3385
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
90,268,930✔
3386
  if (TSDB_CODE_SUCCESS != code) {
90,268,930✔
3387
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3388
    // pRequest->body.fetchFp(param, pRequest, code);
3389
  }
3390
}
3391

3392
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
637,087,873✔
3393
  pRequest->inCallback = true;
637,087,873✔
3394
  int64_t this = pRequest->self;
637,095,888✔
3395
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
637,075,730✔
3396
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
83,550✔
3397
    code = TSDB_CODE_SUCCESS;
×
3398
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3399
  }
3400

3401
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
637,075,730✔
3402
           pRequest);
3403

3404
  if (pRequest->body.queryFp != NULL) {
637,076,592✔
3405
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
637,088,828✔
3406
  }
3407

3408

3409
  SRequestObj* pReq = acquireRequest(this);
637,101,975✔
3410
  if (pReq != NULL) {
637,101,344✔
3411
    pReq->inCallback = false;
636,285,601✔
3412
    (void)releaseRequest(this);
636,285,601✔
3413
  }
3414
}
637,097,623✔
3415

3416
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
553,846✔
3417
                       SParseSqlRes* pRes) {
3418
#ifndef TD_ENTERPRISE
3419
  return TSDB_CODE_SUCCESS;
3420
#else
3421
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
553,846✔
3422
#endif
3423
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc