• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4998

21 Mar 2026 01:22PM UTC coverage: 72.335% (+0.6%) from 71.739%
#4998

push

travis-ci

web-flow
enh:register add secondEp (#34867)

61 of 69 new or added lines in 1 file covered. (88.41%)

8670 existing lines in 142 files now uncovered.

253516 of 350475 relevant lines covered (72.33%)

133451446.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.62
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "command.h"
22
#include "decimal.h"
23
#include "scheduler.h"
24
#include "tdatablock.h"
25
#include "tdataformat.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tmisce.h"
29
#include "tmsg.h"
30
#include "tmsgtype.h"
31
#include "tpagedbuf.h"
32
#include "tref.h"
33
#include "tsched.h"
34
#include "tversion.h"
35

36
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
37
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
38

39
void setQueryRequest(int64_t rId) {
742,505,202✔
40
  SRequestObj* pReq = acquireRequest(rId);
742,505,202✔
41
  if (pReq != NULL) {
742,575,325✔
42
    pReq->isQuery = true;
742,557,454✔
43
    (void)releaseRequest(rId);
742,557,972✔
44
  }
45
}
742,541,215✔
46

47
static bool stringLengthCheck(const char* str, size_t maxsize) {
205,658,595✔
48
  if (str == NULL) {
205,658,595✔
49
    return false;
×
50
  }
51

52
  size_t len = strlen(str);
205,658,595✔
53
  if (len <= 0 || len > maxsize) {
205,658,595✔
UNCOV
54
    return false;
×
55
  }
56

57
  return true;
205,665,558✔
58
}
59

60
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
102,544,881✔
61

62
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
102,546,958✔
63

64
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
568,560✔
65

66
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
102,547,765✔
67
  char key[512] = {0};
102,547,765✔
68
  if (user == NULL) {
102,547,804✔
69
    (void)snprintf(key, sizeof(key), "%s:%s:%d", auth, ip, port);
2,468✔
70
  } else {
71
    (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
102,545,336✔
72
  }
73
  return taosStrdup(key);
102,547,804✔
74
}
75

76
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
576,336✔
77
  if (dst == NULL || src == NULL || srcLength == 0) {
576,336✔
78
    return 0;
482✔
79
  }
80

81
  size_t escapeLength = 0;
575,854✔
82
  for (size_t i = 0; i < srcLength; ++i) {
16,279,304✔
83
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
15,703,450✔
84
        src[i] == '\t') {
15,703,450✔
85
      escapeLength += 1;
×
86
    }
87
  }
88

89
  size_t dstLength = srcLength;
575,854✔
90
  if (escapeLength == 0) {
575,854✔
91
    (void)memcpy(dst, src, srcLength);
575,854✔
92
  } else {
93
    dstLength = 0;
×
94
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
95
      switch (src[i]) {
×
96
        case '\"':
×
97
          dst[dstLength++] = '\\';
×
98
          dst[dstLength++] = '\"';
×
99
          break;
×
100
        case '\\':
×
101
          dst[dstLength++] = '\\';
×
102
          dst[dstLength++] = '\\';
×
103
          break;
×
104
        case '\b':
×
105
          dst[dstLength++] = '\\';
×
106
          dst[dstLength++] = 'b';
×
107
          break;
×
108
        case '\f':
×
109
          dst[dstLength++] = '\\';
×
110
          dst[dstLength++] = 'f';
×
111
          break;
×
112
        case '\n':
×
113
          dst[dstLength++] = '\\';
×
114
          dst[dstLength++] = 'n';
×
115
          break;
×
116
        case '\r':
×
117
          dst[dstLength++] = '\\';
×
118
          dst[dstLength++] = 'r';
×
119
          break;
×
120
        case '\t':
×
121
          dst[dstLength++] = '\\';
×
122
          dst[dstLength++] = 't';
×
123
          break;
×
124
        default:
×
125
          dst[dstLength++] = src[i];
×
126
      }
127
    }
128
  }
129

130
  return dstLength;
575,854✔
131
}
132

133
bool chkRequestKilled(void* param) {
2,147,483,647✔
134
  bool         killed = false;
2,147,483,647✔
135
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
136
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
137
    killed = true;
2,758✔
138
  }
139

140
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
141

142
  return killed;
2,147,483,647✔
143
}
144

145
void cleanupAppInfo() {
1,376,341✔
146
  taosHashCleanup(appInfo.pInstMap);
1,376,341✔
147
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,376,341✔
148
  tscInfo("cluster instance map cleaned");
1,376,341✔
149
}
1,376,341✔
150

151
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
152
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
153
                               STscObj** pTscObj);
154

155
int32_t taos_connect_by_auth(const char* ip, const char* user, const char* auth, const char* totp, const char* db,
102,549,135✔
156
                             uint16_t port, int connType, STscObj** pObj) {
157
  TSC_ERR_RET(taos_init());
102,549,135✔
158

159
  if (user == NULL) {
102,548,147✔
160
    if (auth == NULL || strlen(auth) != (TSDB_TOKEN_LEN - 1)) {
3,282✔
161
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOKEN);
814✔
162
    }
163
  } else if (!validateUserName(user)) {
102,544,865✔
164
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
165
  }
166
  int32_t code = 0;
102,549,501✔
167

168
  char localDb[TSDB_DB_NAME_LEN] = {0};
102,549,501✔
169
  if (db != NULL && strlen(db) > 0) {
102,547,578✔
170
    if (!validateDbName(db)) {
569,345✔
171
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
172
    }
173

174
    tstrncpy(localDb, db, sizeof(localDb));
570,098✔
175
    (void)strdequote(localDb);
569,427✔
176
  }
177

178
  int32_t totpCode = -1;
102,547,611✔
179
  if (totp != NULL) {
102,547,611✔
180
    char* endptr = NULL;
2,469✔
181
    totpCode = taosStr2Int32(totp, &endptr, 10);
2,469✔
182
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
2,469✔
183
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
184
    }
185
  }
186

187
  SCorEpSet epSet = {0};
102,547,611✔
188
  if (ip) {
102,546,837✔
189
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
100,663,829✔
190
  } else {
191
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
1,883,008✔
192
  }
193

194
  if (port) {
102,547,521✔
195
    epSet.epSet.eps[0].port = port;
99,979,999✔
196
    epSet.epSet.eps[1].port = port;
99,979,999✔
197
  }
198

199
  char* key = getClusterKey(user, auth, ip, port);
102,547,521✔
200
  if (NULL == key) {
102,547,894✔
201
    TSC_ERR_RET(terrno);
×
202
  }
203
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
102,547,894✔
204
          user ? user : "", db, key);
205
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
206,987,373✔
206
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
104,435,679✔
207
  }
208

209
  SAppInstInfo** pInst = NULL;
102,551,694✔
210
  code = taosThreadMutexLock(&appInfo.mutex);
102,551,694✔
211
  if (TSDB_CODE_SUCCESS != code) {
102,551,413✔
212
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
213
    TSC_ERR_RET(code);
×
214
  }
215

216
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
102,551,413✔
217
  SAppInstInfo* p = NULL;
102,551,413✔
218
  if (pInst == NULL) {
102,551,413✔
219
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,456,114✔
220
    if (NULL == p) {
1,456,114✔
221
      TSC_ERR_JRET(terrno);
×
222
    }
223
    p->mgmtEp = epSet;
1,456,114✔
224
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,456,114✔
225
    if (TSDB_CODE_SUCCESS != code) {
1,456,114✔
226
      taosMemoryFree(p);
×
227
      TSC_ERR_JRET(code);
×
228
    }
229
    code = openTransporter(user, auth, tsNumOfCores / 2, &p->pTransporter);
1,456,114✔
230
    if (TSDB_CODE_SUCCESS != code) {
1,456,114✔
231
      taosMemoryFree(p);
×
232
      TSC_ERR_JRET(code);
×
233
    }
234
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,456,114✔
235
    if (TSDB_CODE_SUCCESS != code) {
1,456,114✔
236
      destroyAppInst(&p);
×
237
      TSC_ERR_JRET(code);
×
238
    }
239
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,456,114✔
240
    if (TSDB_CODE_SUCCESS != code) {
1,456,114✔
241
      destroyAppInst(&p);
×
242
      TSC_ERR_JRET(code);
×
243
    }
244
    p->instKey = key;
1,456,114✔
245
    key = NULL;
1,456,114✔
246
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user ? user : "", epSet.epSet.eps[0].fqdn,
1,456,114✔
247
            epSet.epSet.eps[0].port);
248

249
    pInst = &p;
1,456,114✔
250
  } else {
251
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
101,095,299✔
252
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
253
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
254
    }
255
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
256
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
101,095,299✔
257
  }
258

259
_return:
102,551,413✔
260

261
  if (TSDB_CODE_SUCCESS != code) {
102,551,413✔
262
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
263
    taosMemoryFreeClear(key);
×
264
    return code;
×
265
  } else {
266
    code = taosThreadMutexUnlock(&appInfo.mutex);
102,551,413✔
267
    taosMemoryFreeClear(key);
102,548,801✔
268
    if (TSDB_CODE_SUCCESS != code) {
102,551,213✔
269
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
270
      return code;
×
271
    }
272
    return taosConnectImpl(user, auth, totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
102,551,213✔
273
  }
274
}
275

276
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
102,547,172✔
277
                              uint16_t port, int connType, STscObj** pObj) {
278
  char auth[TSDB_PASSWORD_LEN + 1] = {0};
102,547,172✔
279
  if (!validatePassword(pass)) {
102,547,285✔
280
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
281
  }
282

283
  taosEncryptPass_c((uint8_t*)pass, strlen(pass), auth);
102,548,433✔
284
  return taos_connect_by_auth(ip, user, auth, totp, db, port, connType, pObj);
102,546,173✔
285
}
286

287
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
288
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
289
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
290
//     return *ppAppInstInfo;
291
//   } else {
292
//     return NULL;
293
//   }
294
// }
295

296
void freeQueryParam(SSyncQueryParam* param) {
523,253✔
297
  if (param == NULL) return;
523,253✔
298
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
523,253✔
299
    tscError("failed to destroy semaphore in freeQueryParam");
×
300
  }
301
  taosMemoryFree(param);
523,253✔
302
}
303

304
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
958,462,536✔
305
                     SRequestObj** pRequest, int64_t reqid) {
306
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
958,462,536✔
307
  if (TSDB_CODE_SUCCESS != code) {
958,467,430✔
308
    tscError("failed to malloc sqlObj, %s", sql);
×
309
    return code;
×
310
  }
311

312
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
958,467,430✔
313
  if ((*pRequest)->sqlstr == NULL) {
958,463,571✔
314
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
315
    destroyRequest(*pRequest);
×
316
    *pRequest = NULL;
×
317
    return terrno;
×
318
  }
319

320
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
958,462,746✔
321
  (*pRequest)->sqlstr[sqlLen] = 0;
958,479,910✔
322
  (*pRequest)->sqlLen = sqlLen;
958,477,581✔
323
  (*pRequest)->validateOnly = validateSql;
958,477,378✔
324
  (*pRequest)->stmtBindVersion = 0;
958,475,085✔
325

326
  code = sqlSecurityCheckStringLevel(*pRequest, (*pRequest)->sqlstr, (*pRequest)->sqlLen);
958,475,944✔
327
  if (code != TSDB_CODE_SUCCESS) {
958,455,761✔
328
    tscWarn("req:0x%" PRIx64 ", sql security string check failed, QID:0x%" PRIx64 ", code:%s", (*pRequest)->self,
1,563✔
329
            (*pRequest)->requestId, tstrerror(code));
330
    destroyRequest(*pRequest);
1,563✔
331
    *pRequest = NULL;
1,563✔
332
    return code;
1,563✔
333
  }
334

335
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
958,454,198✔
336

337
  STscObj* pTscObj = (*pRequest)->pTscObj;
958,477,491✔
338
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
958,473,572✔
339
                             sizeof((*pRequest)->self));
340
  if (err) {
958,473,153✔
341
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
342
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
343
    destroyRequest(*pRequest);
×
344
    *pRequest = NULL;
×
345
    return terrno;
×
346
  }
347

348
  (*pRequest)->allocatorRefId = -1;
958,473,153✔
349
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
958,477,405✔
350
    if (TSDB_CODE_SUCCESS !=
484,428,040✔
351
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
484,411,118✔
352
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
353
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
354
      destroyRequest(*pRequest);
×
355
      *pRequest = NULL;
×
356
      return terrno;
×
357
    }
358
  }
359

360
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
958,488,052✔
361
  return TSDB_CODE_SUCCESS;
958,467,213✔
362
}
363

364
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
365
  int32_t code =
366
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
367
  if (TSDB_CODE_SUCCESS == code) {
×
368
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
369
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
370
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
371
    (*pNewRequest)->isSubReq = true;
×
372
  }
373
  return code;
×
374
}
375

376
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
7,184,705✔
377
  STscObj* pTscObj = pRequest->pTscObj;
7,184,705✔
378

379
  SParseContext cxt = {
7,187,546✔
380
      .requestId = pRequest->requestId,
7,187,421✔
381
      .requestRid = pRequest->self,
7,182,751✔
382
      .acctId = pTscObj->acctId,
7,186,228✔
383
      .db = pRequest->pDb,
7,186,236✔
384
      .topicQuery = topicQuery,
385
      .pSql = pRequest->sqlstr,
7,186,709✔
386
      .sqlLen = pRequest->sqlLen,
7,188,150✔
387
      .pMsg = pRequest->msgBuf,
7,188,291✔
388
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
389
      .pTransporter = pTscObj->pAppInfo->pTransporter,
7,185,056✔
390
      .pStmtCb = pStmtCb,
391
      .pUser = pTscObj->user,
7,187,269✔
392
      .userId = pTscObj->userId,
7,184,259✔
393
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
7,181,463✔
394
      .enableSysInfo = pTscObj->sysInfo,
7,182,923✔
395
      .svrVer = pTscObj->sVer,
7,182,306✔
396
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
7,186,474✔
397
      .stmtBindVersion = pRequest->stmtBindVersion,
7,186,570✔
398
      .setQueryFp = setQueryRequest,
399
      .timezone = pTscObj->optionInfo.timezone,
7,180,793✔
400
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
7,179,765✔
401
  };
402

403
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
7,183,990✔
404
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
7,190,111✔
405
  if (code != TSDB_CODE_SUCCESS) {
7,182,769✔
406
    return code;
×
407
  }
408

409
  code = qParseSql(&cxt, pQuery);
7,182,769✔
410
  if (TSDB_CODE_SUCCESS == code) {
7,173,555✔
411
    if ((*pQuery)->haveResultSet) {
7,173,514✔
412
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
413
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
414
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
415
    }
416
  }
417

418
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
7,175,619✔
419
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
7,170,800✔
420
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
7,179,203✔
421
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
7,176,372✔
422
  }
423

424
  taosArrayDestroy(cxt.pTableMetaPos);
7,177,652✔
425
  taosArrayDestroy(cxt.pTableVgroupPos);
7,171,722✔
426

427
  return code;
7,175,986✔
428
}
429

430
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
431
  SRetrieveTableRsp* pRsp = NULL;
×
432
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
433
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
434
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
435
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
436
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
437
                                 pRequest->stmtBindVersion > 0);
×
438
  }
439

440
  return code;
×
441
}
442

443
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
384,330✔
444
  // drop table if exists not_exists_table
445
  if (NULL == pQuery->pCmdMsg) {
384,330✔
446
    return TSDB_CODE_SUCCESS;
×
447
  }
448

449
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
384,330✔
450
  pRequest->type = pMsgInfo->msgType;
384,330✔
451
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
384,330✔
452
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
384,330✔
453

454
  STscObj*      pTscObj = pRequest->pTscObj;
384,330✔
455
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
384,330✔
456

457
  // int64_t transporterId = 0;
458
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
384,330✔
459
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
384,330✔
460
  return TSDB_CODE_SUCCESS;
384,330✔
461
}
462

463
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
1,532,514,400✔
464

465
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
5,973,583✔
466
  SRetrieveTableRsp* pRsp = NULL;
5,973,583✔
467
  if (pRequest->validateOnly) {
5,973,583✔
468
    doRequestCallback(pRequest, 0);
10,503✔
469
    return;
10,503✔
470
  }
471

472
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
11,888,542✔
473
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
11,888,575✔
474
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
5,963,080✔
475
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
3,270,859✔
476
                                 pRequest->stmtBindVersion > 0);
3,270,859✔
477
  }
478

479
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
5,963,047✔
480
  pRequest->code = code;
5,963,047✔
481

482
  if (pRequest->code != TSDB_CODE_SUCCESS) {
5,963,047✔
483
    pResultInfo->numOfRows = 0;
1,849✔
484
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
1,849✔
485
             pRequest->requestId);
486
  } else {
487
    tscDebug(
5,961,198✔
488
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
489
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
490
  }
491

492
  doRequestCallback(pRequest, code);
5,963,047✔
493
}
494

495
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
116,747,470✔
496
  if (pRequest->validateOnly) {
116,747,470✔
497
    doRequestCallback(pRequest, 0);
×
498
    return TSDB_CODE_SUCCESS;
×
499
  }
500

501
  // drop table if exists not_exists_table
502
  if (NULL == pQuery->pCmdMsg) {
116,747,470✔
503
    doRequestCallback(pRequest, 0);
6,767✔
504
    return TSDB_CODE_SUCCESS;
6,767✔
505
  }
506

507
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
116,741,154✔
508
  pRequest->type = pMsgInfo->msgType;
116,741,048✔
509
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
116,740,573✔
510
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
116,740,425✔
511

512
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
116,738,248✔
513
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
116,738,291✔
514

515
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
116,743,371✔
516
  if (code) {
116,748,655✔
517
    doRequestCallback(pRequest, code);
×
518
  }
519
  return code;
116,750,772✔
520
}
521

522
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
109,277✔
523
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
109,277✔
524
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
109,277✔
525

526
  if (node1->load < node2->load) {
109,277✔
527
    return -1;
×
528
  }
529

530
  return node1->load > node2->load;
109,277✔
531
}
532

533
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
244,014✔
534
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
244,014✔
535
  if (pInfo->pQnodeList) {
244,014✔
536
    taosArrayDestroy(pInfo->pQnodeList);
239,342✔
537
    pInfo->pQnodeList = NULL;
239,342✔
538
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
239,342✔
539
  }
540

541
  if (pNodeList) {
244,014✔
542
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
244,014✔
543
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
244,014✔
544
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
244,014✔
545
             taosArrayGetSize(pInfo->pQnodeList));
546
  }
547
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
244,014✔
548

549
  return TSDB_CODE_SUCCESS;
244,014✔
550
}
551

552
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
952,043,584✔
553
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
952,043,584✔
554
    *required = false;
939,468,684✔
555
    return TSDB_CODE_SUCCESS;
939,463,334✔
556
  }
557

558
  int32_t       code = TSDB_CODE_SUCCESS;
12,574,900✔
559
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
12,574,900✔
560
  *required = false;
12,575,932✔
561

562
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
12,575,416✔
563
  *required = (NULL == pInfo->pQnodeList);
12,577,996✔
564
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
12,577,996✔
565
  return TSDB_CODE_SUCCESS;
12,577,996✔
566
}
567

568
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
569
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
570
  int32_t       code = 0;
×
571

572
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
573
  if (pInfo->pQnodeList) {
×
574
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
575
  }
576
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
577
  if (NULL == *pNodeList) {
×
578
    SCatalog* pCatalog = NULL;
×
579
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
580
    if (TSDB_CODE_SUCCESS == code) {
×
581
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
582
      if (NULL == pNodeList) {
×
583
        TSC_ERR_RET(terrno);
×
584
      }
585
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
586
                               .requestId = pRequest->requestId,
×
587
                               .requestObjRefId = pRequest->self,
×
588
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
589
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
590
    }
591

592
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
593
      code = updateQnodeList(pInfo, *pNodeList);
×
594
    }
595
  }
596

597
  return code;
×
598
}
599

600
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
8,565,808✔
601
  pRequest->type = pQuery->msgType;
8,565,808✔
602
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
8,566,638✔
603

604
  SPlanContext cxt = {.queryId = pRequest->requestId,
8,978,803✔
605
                      .acctId = pRequest->pTscObj->acctId,
8,566,020✔
606
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
8,561,120✔
607
                      .pAstRoot = pQuery->pRoot,
8,570,094✔
608
                      .showRewrite = pQuery->showRewrite,
8,571,494✔
609
                      .pMsg = pRequest->msgBuf,
8,566,540✔
610
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
611
                      .pUser = pRequest->pTscObj->user,
8,562,393✔
612
                      .userId = pRequest->pTscObj->userId,
8,556,735✔
613
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
8,560,603✔
614
                      .sysInfo = pRequest->pTscObj->sysInfo};
8,559,520✔
615

616
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
8,548,463✔
617
}
618

619
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
248,609,718✔
620
                         const SExtSchema* pExtSchema, bool isStmt) {
621
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
248,609,718✔
622
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
623
    return TSDB_CODE_INVALID_PARA;
×
624
  }
625

626
  pResInfo->numOfCols = numOfCols;
248,637,203✔
627
  if (pResInfo->fields != NULL) {
248,639,028✔
628
    taosMemoryFree(pResInfo->fields);
16,026✔
629
  }
630
  if (pResInfo->userFields != NULL) {
248,627,739✔
631
    taosMemoryFree(pResInfo->userFields);
16,026✔
632
  }
633
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
248,629,246✔
634
  if (NULL == pResInfo->fields) return terrno;
248,602,330✔
635
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
248,604,843✔
636
  if (NULL == pResInfo->userFields) {
248,614,613✔
637
    taosMemoryFree(pResInfo->fields);
×
638
    return terrno;
×
639
  }
640
  if (numOfCols != pResInfo->numOfCols) {
248,607,268✔
641
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
642
    return TSDB_CODE_FAILED;
×
643
  }
644

645
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
1,174,879,066✔
646
    pResInfo->fields[i].type = pSchema[i].type;
926,249,119✔
647

648
    pResInfo->userFields[i].type = pSchema[i].type;
926,246,743✔
649
    // userFields must convert to type bytes, no matter isStmt or not
650
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
926,249,797✔
651
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
926,241,502✔
652
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
926,255,825✔
653
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,399,373✔
654
    }
655

656
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
926,264,058✔
657
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
926,277,807✔
658
  }
659
  return TSDB_CODE_SUCCESS;
248,657,453✔
660
}
661

662
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
204,545,501✔
663
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
204,545,501✔
664
      precision != TSDB_TIME_PRECISION_NANO) {
665
    return;
×
666
  }
667

668
  pResInfo->precision = precision;
204,545,501✔
669
}
670

671
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
209,063,743✔
672
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
209,063,743✔
673
  if (NULL == nodeList) {
209,092,711✔
674
    return terrno;
×
675
  }
676
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
209,097,805✔
677

678
  int32_t dbNum = taosArrayGetSize(pDbVgList);
209,097,805✔
679
  for (int32_t i = 0; i < dbNum; ++i) {
415,827,221✔
680
    SArray* pVg = taosArrayGetP(pDbVgList, i);
206,696,704✔
681
    if (NULL == pVg) {
206,702,647✔
682
      continue;
×
683
    }
684
    int32_t vgNum = taosArrayGetSize(pVg);
206,702,647✔
685
    if (vgNum <= 0) {
206,703,594✔
686
      continue;
590,686✔
687
    }
688

689
    for (int32_t j = 0; j < vgNum; ++j) {
735,446,788✔
690
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
529,315,676✔
691
      if (NULL == pInfo) {
529,327,145✔
692
        taosArrayDestroy(nodeList);
×
693
        return TSDB_CODE_OUT_OF_RANGE;
×
694
      }
695
      SQueryNodeLoad load = {0};
529,327,145✔
696
      load.addr.nodeId = pInfo->vgId;
529,340,824✔
697
      load.addr.epSet = pInfo->epSet;
529,343,486✔
698

699
      if (NULL == taosArrayPush(nodeList, &load)) {
529,247,559✔
700
        taosArrayDestroy(nodeList);
×
701
        return terrno;
×
702
      }
703
    }
704
  }
705

706
  int32_t vnodeNum = taosArrayGetSize(nodeList);
209,130,517✔
707
  if (vnodeNum > 0) {
209,121,372✔
708
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
205,811,517✔
709
    goto _return;
205,808,785✔
710
  }
711

712
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
3,309,855✔
713
  if (mnodeNum <= 0) {
3,307,277✔
714
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
715
    goto _return;
×
716
  }
717

718
  void* pData = taosArrayGet(pMnodeList, 0);
3,307,277✔
719
  if (NULL == pData) {
3,307,277✔
720
    taosArrayDestroy(nodeList);
×
721
    return TSDB_CODE_OUT_OF_RANGE;
×
722
  }
723
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
3,307,277✔
724
    taosArrayDestroy(nodeList);
×
725
    return terrno;
×
726
  }
727

728
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
3,307,277✔
729

730
_return:
46,736✔
731

732
  *pNodeList = nodeList;
209,105,940✔
733

734
  return TSDB_CODE_SUCCESS;
209,102,752✔
735
}
736

737
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
1,583,456✔
738
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,583,456✔
739
  if (NULL == nodeList) {
1,583,456✔
740
    return terrno;
×
741
  }
742

743
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
1,583,456✔
744
  if (qNodeNum > 0) {
1,583,456✔
745
    void* pData = taosArrayGet(pQnodeList, 0);
1,564,512✔
746
    if (NULL == pData) {
1,564,512✔
747
      taosArrayDestroy(nodeList);
×
748
      return TSDB_CODE_OUT_OF_RANGE;
×
749
    }
750
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
1,564,512✔
751
      taosArrayDestroy(nodeList);
×
752
      return terrno;
×
753
    }
754
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
1,564,512✔
755
    goto _return;
1,564,512✔
756
  }
757

758
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
18,944✔
759
  if (mnodeNum <= 0) {
18,944✔
760
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
×
761
    goto _return;
×
762
  }
763

764
  void* pData = taosArrayGet(pMnodeList, 0);
18,944✔
765
  if (NULL == pData) {
18,944✔
766
    taosArrayDestroy(nodeList);
×
767
    return TSDB_CODE_OUT_OF_RANGE;
×
768
  }
769
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
18,944✔
770
    taosArrayDestroy(nodeList);
×
771
    return terrno;
×
772
  }
773

774
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
18,944✔
775

776
_return:
×
777

778
  *pNodeList = nodeList;
1,583,456✔
779

780
  return TSDB_CODE_SUCCESS;
1,583,456✔
781
}
782

783
void freeVgList(void* list) {
8,513,862✔
784
  SArray* pList = *(SArray**)list;
8,513,862✔
785
  taosArrayDestroy(pList);
8,518,172✔
786
}
8,507,475✔
787

788
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
202,088,699✔
789
  SArray* pDbVgList = NULL;
202,088,699✔
790
  SArray* pQnodeList = NULL;
202,088,699✔
791
  FDelete fp = NULL;
202,088,699✔
792
  int32_t code = 0;
202,088,699✔
793

794
  switch (tsQueryPolicy) {
202,088,699✔
795
    case QUERY_POLICY_VNODE:
200,525,696✔
796
    case QUERY_POLICY_CLIENT: {
797
      if (pResultMeta) {
200,525,696✔
798
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
200,538,561✔
799
        if (NULL == pDbVgList) {
200,525,003✔
800
          code = terrno;
×
801
          goto _return;
×
802
        }
803
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
200,525,003✔
804
        for (int32_t i = 0; i < dbNum; ++i) {
398,710,668✔
805
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
198,173,526✔
806
          if (pRes->code || NULL == pRes->pRes) {
198,173,851✔
807
            continue;
560✔
808
          }
809

810
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
396,358,314✔
811
            code = terrno;
×
812
            goto _return;
×
813
          }
814
        }
815
      } else {
816
        fp = freeVgList;
1,764✔
817

818
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
1,764✔
819
        if (dbNum > 0) {
1,764✔
820
          SCatalog*     pCtg = NULL;
1,764✔
821
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,764✔
822
          code = catalogGetHandle(pInst->clusterId, &pCtg);
1,764✔
823
          if (code != TSDB_CODE_SUCCESS) {
1,764✔
824
            goto _return;
×
825
          }
826

827
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
1,764✔
828
          if (NULL == pDbVgList) {
1,764✔
829
            code = terrno;
×
830
            goto _return;
×
831
          }
832
          SArray* pVgList = NULL;
1,764✔
833
          for (int32_t i = 0; i < dbNum; ++i) {
3,528✔
834
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
1,764✔
835
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
1,764✔
836
                                     .requestId = pRequest->requestId,
1,764✔
837
                                     .requestObjRefId = pRequest->self,
1,764✔
838
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
1,764✔
839

840
            // catalogGetDBVgList will handle dbFName == null.
841
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
1,764✔
842
            if (code) {
1,764✔
843
              goto _return;
×
844
            }
845

846
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
1,764✔
847
              code = terrno;
×
848
              goto _return;
×
849
            }
850
          }
851
        }
852
      }
853

854
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
200,538,906✔
855
      break;
200,534,873✔
856
    }
857
    case QUERY_POLICY_HYBRID:
1,583,456✔
858
    case QUERY_POLICY_QNODE: {
859
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
1,606,012✔
860
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
22,556✔
861
        if (pRes->code) {
22,556✔
862
          pQnodeList = NULL;
×
863
        } else {
864
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
22,556✔
865
          if (NULL == pQnodeList) {
22,556✔
866
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
867
            goto _return;
×
868
          }
869
        }
870
      } else {
871
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,560,900✔
872
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
1,560,900✔
873
        if (pInst->pQnodeList) {
1,560,900✔
874
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
1,560,900✔
875
          if (NULL == pQnodeList) {
1,560,900✔
876
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
877
            goto _return;
×
878
          }
879
        }
880
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
1,560,900✔
881
      }
882

883
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
1,583,456✔
884
      break;
1,583,456✔
885
    }
886
    default:
106✔
887
      tscError("unknown query policy: %d", tsQueryPolicy);
106✔
888
      return TSDB_CODE_APP_ERROR;
×
889
  }
890

891
_return:
202,118,329✔
892
  taosArrayDestroyEx(pDbVgList, fp);
202,118,329✔
893
  taosArrayDestroy(pQnodeList);
202,129,580✔
894

895
  return code;
202,131,620✔
896
}
897

898
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
8,546,847✔
899
  SArray* pDbVgList = NULL;
8,546,847✔
900
  SArray* pQnodeList = NULL;
8,546,847✔
901
  int32_t code = 0;
8,550,668✔
902

903
  switch (tsQueryPolicy) {
8,550,668✔
904
    case QUERY_POLICY_VNODE:
8,545,753✔
905
    case QUERY_POLICY_CLIENT: {
906
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
8,545,753✔
907
      if (dbNum > 0) {
8,564,255✔
908
        SCatalog*     pCtg = NULL;
8,516,228✔
909
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
8,516,228✔
910
        code = catalogGetHandle(pInst->clusterId, &pCtg);
8,513,053✔
911
        if (code != TSDB_CODE_SUCCESS) {
8,502,272✔
912
          goto _return;
×
913
        }
914

915
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
8,502,272✔
916
        if (NULL == pDbVgList) {
8,517,359✔
917
          code = terrno;
94✔
918
          goto _return;
×
919
        }
920
        SArray* pVgList = NULL;
8,517,265✔
921
        for (int32_t i = 0; i < dbNum; ++i) {
17,028,140✔
922
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
8,509,321✔
923
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
8,519,807✔
924
                                   .requestId = pRequest->requestId,
8,510,104✔
925
                                   .requestObjRefId = pRequest->self,
8,511,796✔
926
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
8,514,060✔
927

928
          // catalogGetDBVgList will handle dbFName == null.
929
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
8,520,589✔
930
          if (code) {
8,518,585✔
931
            goto _return;
×
932
          }
933

934
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
8,523,302✔
935
            code = terrno;
×
936
            goto _return;
×
937
          }
938
        }
939
      }
940

941
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
8,567,092✔
942
      break;
8,562,145✔
943
    }
944
    case QUERY_POLICY_HYBRID:
×
945
    case QUERY_POLICY_QNODE: {
946
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
947

948
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
949
      break;
×
950
    }
951
    default:
4,956✔
952
      tscError("unknown query policy: %d", tsQueryPolicy);
4,956✔
953
      return TSDB_CODE_APP_ERROR;
×
954
  }
955

956
_return:
8,556,983✔
957

958
  taosArrayDestroyEx(pDbVgList, freeVgList);
8,560,428✔
959
  taosArrayDestroy(pQnodeList);
8,557,911✔
960

961
  return code;
8,561,588✔
962
}
963

964
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
8,553,210✔
965
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
8,553,210✔
966

967
  SExecResult      res = {0};
8,566,331✔
968
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
8,565,515✔
969
                           .requestId = pRequest->requestId,
8,560,665✔
970
                           .requestObjRefId = pRequest->self};
8,550,249✔
971
  SSchedulerReq    req = {
8,979,782✔
972
         .syncReq = true,
973
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
8,561,995✔
974
         .pConn = &conn,
975
         .pNodeList = pNodeList,
976
         .pDag = pDag,
977
         .sql = pRequest->sqlstr,
8,561,995✔
978
         .startTs = pRequest->metric.start,
8,556,496✔
979
         .execFp = NULL,
980
         .cbParam = NULL,
981
         .chkKillFp = chkRequestKilled,
982
         .chkKillParam = (void*)pRequest->self,
8,560,613✔
983
         .pExecRes = &res,
984
         .source = pRequest->source,
8,552,663✔
985
         .secureDelete = pRequest->secureDelete,
8,557,375✔
986
         .pWorkerCb = getTaskPoolWorkerCb(),
8,565,272✔
987
  };
988

989
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
8,553,040✔
990

991
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
8,569,912✔
992
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
8,569,942✔
993

994
  if (code != TSDB_CODE_SUCCESS) {
8,569,488✔
995
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
996

997
    pRequest->code = code;
×
998
    terrno = code;
×
999
    return pRequest->code;
×
1000
  }
1001

1002
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
8,569,488✔
1003
      TDMT_VND_CREATE_TABLE == pRequest->type) {
37,522✔
1004
    pRequest->body.resInfo.numOfRows = res.numOfRows;
8,547,112✔
1005
    if (TDMT_VND_SUBMIT == pRequest->type) {
8,547,739✔
1006
      STscObj*            pTscObj = pRequest->pTscObj;
8,530,741✔
1007
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
8,531,783✔
1008
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
8,534,796✔
1009
    }
1010

1011
    schedulerFreeJob(&pRequest->body.queryJob, 0);
8,549,364✔
1012
  }
1013

1014
  pRequest->code = res.code;
8,568,910✔
1015
  terrno = res.code;
8,570,373✔
1016
  return pRequest->code;
8,569,382✔
1017
}
1018

1019
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
474,089,312✔
1020
  SArray*      pArray = NULL;
474,089,312✔
1021
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
474,089,312✔
1022
  if (NULL == pRsp->aCreateTbRsp) {
474,089,312✔
1023
    return TSDB_CODE_SUCCESS;
461,158,713✔
1024
  }
1025

1026
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
12,941,735✔
1027
  for (int32_t i = 0; i < tbNum; ++i) {
28,851,313✔
1028
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
15,909,008✔
1029
    if (pTbRsp->pMeta) {
15,910,349✔
1030
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
15,624,193✔
1031
    }
1032
  }
1033

1034
  return TSDB_CODE_SUCCESS;
12,942,305✔
1035
}
1036

1037
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
159,496,562✔
1038
  int32_t code = 0;
159,496,562✔
1039
  SArray* pArray = NULL;
159,496,562✔
1040
  SArray* pTbArray = (SArray*)res;
159,496,562✔
1041
  int32_t tbNum = taosArrayGetSize(pTbArray);
159,496,562✔
1042
  if (tbNum <= 0) {
159,497,153✔
1043
    return TSDB_CODE_SUCCESS;
×
1044
  }
1045

1046
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
159,497,153✔
1047
  if (NULL == pArray) {
159,498,874✔
1048
    return terrno;
93✔
1049
  }
1050

1051
  for (int32_t i = 0; i < tbNum; ++i) {
537,664,681✔
1052
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
378,165,146✔
1053
    if (NULL == tbInfo) {
378,165,772✔
1054
      code = terrno;
×
1055
      goto _return;
×
1056
    }
1057
    STbSVersion tbSver = {
378,165,772✔
1058
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
378,165,753✔
1059
    if (NULL == taosArrayPush(pArray, &tbSver)) {
378,166,368✔
1060
      code = terrno;
×
1061
      goto _return;
×
1062
    }
1063
  }
1064

1065
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
159,499,535✔
1066
                           .requestId = pRequest->requestId,
159,500,439✔
1067
                           .requestObjRefId = pRequest->self,
159,499,862✔
1068
                           .mgmtEps = *epset};
1069

1070
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
159,500,507✔
1071

1072
_return:
159,498,868✔
1073

1074
  taosArrayDestroy(pArray);
159,497,566✔
1075
  return code;
159,499,294✔
1076
}
1077

1078
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
8,617,205✔
1079
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
8,617,205✔
1080
}
1081

1082
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
67,575,576✔
1083
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
67,575,576✔
1084
}
1085

1086
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
735,552,781✔
1087
  if (NULL == pRequest->body.resInfo.execRes.res) {
735,552,781✔
1088
    return pRequest->code;
54,964,999✔
1089
  }
1090

1091
  SCatalog*     pCatalog = NULL;
680,565,994✔
1092
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
680,575,078✔
1093

1094
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
680,595,335✔
1095
  if (code) {
680,588,783✔
1096
    return code;
×
1097
  }
1098

1099
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
680,588,783✔
1100
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
680,599,721✔
1101

1102
  switch (pRes->msgType) {
680,601,998✔
1103
    case TDMT_VND_ALTER_TABLE:
3,857,144✔
1104
    case TDMT_MND_ALTER_STB: {
1105
      code = handleAlterTbExecRes(pRes->res, pCatalog);
3,857,144✔
1106
      break;
3,857,144✔
1107
    }
1108
    case TDMT_VND_CREATE_TABLE: {
42,749,397✔
1109
      SArray* pList = (SArray*)pRes->res;
42,749,397✔
1110
      int32_t num = taosArrayGetSize(pList);
42,747,081✔
1111
      for (int32_t i = 0; i < num; ++i) {
92,795,443✔
1112
        void* res = taosArrayGetP(pList, i);
50,021,636✔
1113
        // handleCreateTbExecRes will handle res == null
1114
        code = handleCreateTbExecRes(res, pCatalog);
50,019,860✔
1115
      }
1116
      break;
42,773,807✔
1117
    }
1118
    case TDMT_MND_CREATE_STB: {
376,301✔
1119
      code = handleCreateTbExecRes(pRes->res, pCatalog);
376,301✔
1120
      break;
376,301✔
1121
    }
1122
    case TDMT_VND_SUBMIT: {
474,092,733✔
1123
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
474,092,733✔
1124

1125
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
474,099,242✔
1126
      break;
474,098,024✔
1127
    }
1128
    case TDMT_SCH_QUERY:
159,498,385✔
1129
    case TDMT_SCH_MERGE_QUERY: {
1130
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
159,498,385✔
1131
      break;
159,496,607✔
1132
    }
1133
    default:
1,469✔
1134
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
1,469✔
1135
               pRequest->type, pRequest->requestId);
1136
      code = TSDB_CODE_APP_ERROR;
×
1137
  }
1138

1139
  return code;
680,601,883✔
1140
}
1141

1142
static bool incompletaFileParsing(SNode* pStmt) {
712,017,401✔
1143
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
712,017,401✔
1144
}
1145

1146
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1147
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1148

1149
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1150
  if (TSDB_CODE_SUCCESS == code) {
×
1151
    int64_t analyseStart = taosGetTimestampUs();
×
1152
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1153
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1154
  }
1155

1156
  if (TSDB_CODE_SUCCESS == code) {
×
1157
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1158
  }
1159

1160
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1161
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1162
}
×
1163

1164
void returnToUser(SRequestObj* pRequest) {
84,022,414✔
1165
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
84,022,414✔
1166
    // return to client
1167
    doRequestCallback(pRequest, pRequest->code);
84,022,414✔
1168
    return;
84,019,298✔
1169
  }
1170

1171
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1172
  if (pUserReq) {
×
1173
    pUserReq->code = pRequest->code;
×
1174
    // return to client
1175
    doRequestCallback(pUserReq, pUserReq->code);
×
1176
    (void)releaseRequest(pRequest->relation.userRefId);
×
1177
    return;
×
1178
  } else {
1179
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1180
             pRequest->relation.userRefId, pRequest->requestId);
1181
  }
1182
}
1183

1184
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1185
  int64_t     lastTs = 0;
×
1186
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1187
  int32_t     numOfFields = taos_num_fields(pRes);
×
1188

1189
  int32_t code = createDataBlock(pBlock);
×
1190
  if (code) {
×
1191
    return code;
×
1192
  }
1193

1194
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1195
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1196
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1197
    if (TSDB_CODE_SUCCESS != code) {
×
1198
      blockDataDestroy(*pBlock);
×
1199
      return code;
×
1200
    }
1201
  }
1202

1203
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1204
  if (TSDB_CODE_SUCCESS != code) {
×
1205
    blockDataDestroy(*pBlock);
×
1206
    return code;
×
1207
  }
1208

1209
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1210
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1211
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1212
      tscError("invalid data from vnode");
×
1213
      blockDataDestroy(*pBlock);
×
1214
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1215
    }
1216
    int64_t ts = *(int64_t*)pRow[0];
×
1217
    if (lastTs < ts) {
×
1218
      lastTs = ts;
×
1219
    }
1220

1221
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1222
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1223
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1224
      if (TSDB_CODE_SUCCESS != code) {
×
1225
        blockDataDestroy(*pBlock);
×
1226
        return code;
×
1227
      }
1228
    }
1229

1230
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1231
            *(int64_t*)pRow[2]);
1232
  }
1233

1234
  (*pBlock)->info.window.ekey = lastTs;
×
1235
  (*pBlock)->info.rows = numOfRows;
×
1236

1237
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1238
  return TSDB_CODE_SUCCESS;
×
1239
}
1240

1241
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1242
  SRequestObj* pRequest = (SRequestObj*)res;
×
1243
  if (pRequest->code) {
×
1244
    returnToUser(pRequest);
×
1245
    return;
×
1246
  }
1247

1248
  SSDataBlock* pBlock = NULL;
×
1249
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1250
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1251
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1252
             tstrerror(pRequest->code));
1253
    returnToUser(pRequest);
×
1254
    return;
×
1255
  }
1256

1257
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1258
  if (pNextReq) {
×
1259
    continuePostSubQuery(pNextReq, pBlock);
×
1260
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1261
  } else {
1262
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1263
             pRequest->relation.nextRefId, pRequest->requestId);
1264
  }
1265

1266
  blockDataDestroy(pBlock);
×
1267
}
1268

1269
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1270
  SRequestObj* pRequest = pWrapper->pRequest;
×
1271
  if (TD_RES_QUERY(pRequest)) {
×
1272
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1273
    return;
×
1274
  }
1275

1276
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1277
  if (pNextReq) {
×
1278
    continuePostSubQuery(pNextReq, NULL);
×
1279
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1280
  } else {
1281
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1282
             pRequest->relation.nextRefId, pRequest->requestId);
1283
  }
1284
}
1285

1286
// todo refacto the error code  mgmt
1287
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
726,712,960✔
1288
  SSqlCallbackWrapper* pWrapper = param;
726,712,960✔
1289
  SRequestObj*         pRequest = pWrapper->pRequest;
726,712,960✔
1290
  STscObj*             pTscObj = pRequest->pTscObj;
726,714,225✔
1291

1292
  // Note: This is EXECUTE completion callback, not FETCH callback.
1293
  // Scheduler job phase is authoritative. Client phase is only fallback.
1294
  // Let heartbeat read scheduler job phase via schedulerGetJobPhase().
1295

1296
  pRequest->code = code;
726,712,505✔
1297
  if (pResult) {
726,713,779✔
1298
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
726,634,233✔
1299
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
726,633,630✔
1300
  }
1301

1302
  int32_t type = pRequest->type;
726,686,469✔
1303
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
726,671,967✔
1304
    if (pResult) {
512,144,899✔
1305
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
512,113,899✔
1306

1307
      // record the insert rows
1308
      if (TDMT_VND_SUBMIT == type) {
512,118,185✔
1309
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
465,989,521✔
1310
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
465,992,279✔
1311
      }
1312
    }
1313
    schedulerFreeJob(&pRequest->body.queryJob, 0);
512,151,055✔
1314
  }
1315

1316
  taosMemoryFree(pResult);
726,714,987✔
1317
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
726,714,884✔
1318
           pRequest->requestId);
1319

1320
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
726,704,333✔
1321
      pRequest->stmtBindVersion == 0) {
102,377✔
1322
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
102,181✔
1323
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1324
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
102,181✔
1325
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1326
    }
1327
    restartAsyncQuery(pRequest, code);
102,181✔
1328
    return;
102,181✔
1329
  }
1330

1331
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
726,602,152✔
1332
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
726,602,152✔
1333
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
3,671,099✔
1334
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1335
    }
1336
  }
1337

1338
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
726,616,929✔
1339

1340
  int32_t code1 = handleQueryExecRsp(pRequest);
726,602,716✔
1341
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
726,612,084✔
1342
    pRequest->code = code1;
×
1343
  }
1344

1345
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,438,632,247✔
1346
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
712,011,249✔
1347
    continueInsertFromCsv(pWrapper, pRequest);
14,073✔
1348
    return;
12,383✔
1349
  }
1350

1351
  if (pRequest->relation.nextRefId) {
726,600,829✔
1352
    handlePostSubQuery(pWrapper);
×
1353
  } else {
1354
    destorySqlCallbackWrapper(pWrapper);
726,610,357✔
1355
    pRequest->pWrapper = NULL;
726,604,562✔
1356

1357
    // return to client
1358
    doRequestCallback(pRequest, code);
726,608,256✔
1359
  }
1360
}
1361

1362
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
8,934,785✔
1363
  int32_t code = 0;
8,934,785✔
1364
  int32_t subplanNum = 0;
8,934,785✔
1365

1366
  if (pQuery->pRoot) {
8,934,785✔
1367
    pRequest->stmtType = pQuery->pRoot->type;
8,562,519✔
1368
    if (nodeType(pQuery->pRoot) == QUERY_NODE_DELETE_STMT) {
8,562,583✔
1369
      pRequest->secureDelete = ((SDeleteStmt*)pQuery->pRoot)->secureDelete;
×
1370
    }
1371
  }
1372

1373
  if (pQuery->pRoot && !pRequest->inRetry) {
8,942,608✔
1374
    STscObj*            pTscObj = pRequest->pTscObj;
8,564,668✔
1375
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
8,562,836✔
1376
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
8,562,686✔
1377
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
8,551,966✔
1378
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
15,053✔
1379
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
15,061✔
1380
    }
1381
  }
1382

1383
  pRequest->body.execMode = pQuery->execMode;
8,949,072✔
1384
  switch (pQuery->execMode) {
8,956,017✔
1385
    case QUERY_EXEC_MODE_LOCAL:
×
1386
      if (!pRequest->validateOnly) {
×
1387
        if (NULL == pQuery->pRoot) {
×
1388
          terrno = TSDB_CODE_INVALID_PARA;
×
1389
          code = terrno;
×
1390
        } else {
1391
          code = execLocalCmd(pRequest, pQuery);
×
1392
        }
1393
      }
1394
      break;
×
1395
    case QUERY_EXEC_MODE_RPC:
384,330✔
1396
      if (!pRequest->validateOnly) {
384,330✔
1397
        code = execDdlQuery(pRequest, pQuery);
384,330✔
1398
      }
1399
      break;
384,330✔
1400
    case QUERY_EXEC_MODE_SCHEDULE: {
8,557,347✔
1401
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
8,557,347✔
1402
      if (NULL == pMnodeList) {
8,554,084✔
1403
        code = terrno;
×
1404
        break;
×
1405
      }
1406
      SQueryPlan* pDag = NULL;
8,554,084✔
1407
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
8,554,084✔
1408
      if (TSDB_CODE_SUCCESS == code) {
8,561,755✔
1409
        pRequest->body.subplanNum = pDag->numOfSubplans;
8,564,701✔
1410
        if (!pRequest->validateOnly) {
8,554,211✔
1411
          SArray* pNodeList = NULL;
8,553,001✔
1412
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
8,554,462✔
1413

1414
          if (TSDB_CODE_SUCCESS == code) {
8,555,903✔
1415
            code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
8,560,368✔
1416
                                        taosArrayGetSize(pNodeList));
8,553,612✔
1417
          }
1418

1419
          if (TSDB_CODE_SUCCESS == code) {
8,561,358✔
1420
            code = scheduleQuery(pRequest, pDag, pNodeList);
8,561,358✔
1421
          }
1422
          taosArrayDestroy(pNodeList);
8,567,953✔
1423
        }
1424
      }
1425
      taosArrayDestroy(pMnodeList);
8,560,474✔
1426
      break;
8,569,824✔
1427
    }
1428
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1429
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1430
      break;
×
1431
    default:
×
1432
      break;
×
1433
  }
1434

1435
  if (!keepQuery) {
8,954,266✔
1436
    qDestroyQuery(pQuery);
×
1437
  }
1438

1439
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
8,954,266✔
1440
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
8,029✔
1441
    if (TSDB_CODE_SUCCESS != ret) {
8,029✔
1442
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1443
               pRequest->requestId);
1444
    }
1445
  }
1446

1447
  if (TSDB_CODE_SUCCESS == code) {
8,951,374✔
1448
    code = handleQueryExecRsp(pRequest);
8,950,159✔
1449
  }
1450

1451
  if (TSDB_CODE_SUCCESS != code) {
8,956,459✔
1452
    pRequest->code = code;
7,582✔
1453
  }
1454

1455
  if (res) {
8,956,459✔
1456
    *res = pRequest->body.resInfo.execRes.res;
×
1457
    pRequest->body.resInfo.execRes.res = NULL;
×
1458
  }
1459
}
8,956,459✔
1460

1461
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
727,000,391✔
1462
                                 SSqlCallbackWrapper* pWrapper) {
1463
  int32_t code = TSDB_CODE_SUCCESS;
727,000,391✔
1464
  pRequest->type = pQuery->msgType;
727,000,391✔
1465
  SArray*     pMnodeList = NULL;
727,033,176✔
1466
  SArray*     pNodeList = NULL;
727,033,176✔
1467
  SQueryPlan* pDag = NULL;
727,018,739✔
1468
  int64_t     st = taosGetTimestampUs();
727,084,625✔
1469

1470
  if (!pRequest->parseOnly) {
727,084,625✔
1471
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_PLAN);
1,454,226,666✔
1472

1473
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
727,157,817✔
1474
    if (NULL == pMnodeList) {
727,102,694✔
1475
      code = terrno;
×
1476
    }
1477
    SPlanContext cxt = {.queryId = pRequest->requestId,
828,864,892✔
1478
                        .acctId = pRequest->pTscObj->acctId,
727,125,532✔
1479
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
727,128,803✔
1480
                        .pAstRoot = pQuery->pRoot,
727,162,202✔
1481
                        .showRewrite = pQuery->showRewrite,
727,154,006✔
1482
                        .isView = pWrapper->pParseCtx->isView,
727,148,800✔
1483
                        .isAudit = pWrapper->pParseCtx->isAudit,
727,146,692✔
1484
                        .privInfo = pWrapper->pParseCtx->privInfo,
727,119,557✔
1485
                        .pMsg = pRequest->msgBuf,
727,112,176✔
1486
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1487
                        .pUser = pRequest->pTscObj->user,
727,121,825✔
1488
                        .userId = pRequest->pTscObj->userId,
727,124,221✔
1489
                        .sysInfo = pRequest->pTscObj->sysInfo,
727,118,631✔
1490
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
727,113,301✔
1491
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
727,122,118✔
1492
    if (TSDB_CODE_SUCCESS == code) {
727,126,305✔
1493
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
727,134,943✔
1494
    }
1495
    if (code) {
727,119,624✔
1496
      tscError("req:0x%" PRIx64 " requestId:0x%" PRIx64 ", failed to create query plan, code:%s msg:%s", pRequest->self,
252,247✔
1497
               pRequest->requestId, tstrerror(code), cxt.pMsg);
1498
    } else {
1499
      pRequest->body.subplanNum = pDag->numOfSubplans;
726,867,377✔
1500
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
726,863,532✔
1501
    }
1502
  }
1503

1504
  pRequest->metric.execStart = taosGetTimestampUs();
727,107,184✔
1505
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
727,103,019✔
1506

1507
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
727,053,838✔
1508
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
726,659,917✔
1509
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_SCHEDULE);
404,261,256✔
1510
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
202,135,367✔
1511
    }
1512

1513
    if (code == TSDB_CODE_SUCCESS) {
726,630,238✔
1514
      code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
726,605,550✔
1515
                                  taosArrayGetSize(pNodeList));
726,585,305✔
1516
    }
1517

1518
    if (code == TSDB_CODE_SUCCESS) {
726,663,471✔
1519
      SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
726,660,395✔
1520
                               .requestId = pRequest->requestId,
726,687,361✔
1521
                               .requestObjRefId = pRequest->self};
726,684,224✔
1522
      SSchedulerReq    req = {
777,508,111✔
1523
             .syncReq = false,
1524
             .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
726,619,890✔
1525
             .pConn = &conn,
1526
             .pNodeList = pNodeList,
1527
             .pDag = pDag,
1528
             .allocatorRefId = pRequest->allocatorRefId,
726,619,890✔
1529
             .sql = pRequest->sqlstr,
726,644,710✔
1530
             .startTs = pRequest->metric.start,
726,625,414✔
1531
             .execFp = schedulerExecCb,
1532
             .cbParam = pWrapper,
1533
             .chkKillFp = chkRequestKilled,
1534
             .chkKillParam = (void*)pRequest->self,
726,610,544✔
1535
             .pExecRes = NULL,
1536
             .source = pRequest->source,
726,647,769✔
1537
             .secureDelete = pRequest->secureDelete,
726,631,493✔
1538
             .pWorkerCb = getTaskPoolWorkerCb(),
726,621,917✔
1539
      };
1540

1541
      if (TSDB_CODE_SUCCESS == code) {
726,626,221✔
1542
        CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_EXECUTE);
1,453,377,709✔
1543
        code = schedulerExecJob(&req, &pRequest->body.queryJob);
726,696,319✔
1544
      }
1545
      taosArrayDestroy(pNodeList);
726,633,596✔
1546
      taosArrayDestroy(pMnodeList);
726,710,542✔
1547
      return code;
726,703,564✔
1548
    }
1549
  }
1550

1551
  qDestroyQueryPlan(pDag);
427,414✔
1552
  tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
444,610✔
1553
           pRequest->requestId);
1554
  destorySqlCallbackWrapper(pWrapper);
444,610✔
1555
  pRequest->pWrapper = NULL;
444,610✔
1556
  if (TSDB_CODE_SUCCESS != code) {
444,610✔
1557
    pRequest->code = code;
255,323✔
1558
  }
1559

1560
  doRequestCallback(pRequest, code);
444,610✔
1561

1562
  // todo not to be released here
1563
  taosArrayDestroy(pMnodeList);
444,610✔
1564
  taosArrayDestroy(pNodeList);
444,610✔
1565

1566
  return code;
441,435✔
1567
}
1568

1569
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
850,698,501✔
1570
  int32_t code = 0;
850,698,501✔
1571

1572
  if (pRequest->parseOnly) {
850,698,501✔
1573
    doRequestCallback(pRequest, 0);
295,948✔
1574
    return;
295,948✔
1575
  }
1576

1577
  pRequest->body.execMode = pQuery->execMode;
850,420,765✔
1578
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
850,406,541✔
1579
    destorySqlCallbackWrapper(pWrapper);
123,330,139✔
1580
    pRequest->pWrapper = NULL;
123,330,019✔
1581
  }
1582

1583
  if (pQuery->pRoot && !pRequest->inRetry) {
850,362,258✔
1584
    STscObj*            pTscObj = pRequest->pTscObj;
850,403,053✔
1585
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
850,401,732✔
1586
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
850,429,214✔
1587
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
524,541,564✔
1588
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
465,717,180✔
1589
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
384,710,474✔
1590
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
147,000,416✔
1591
    }
1592
  }
1593

1594
  switch (pQuery->execMode) {
850,361,524✔
1595
    case QUERY_EXEC_MODE_LOCAL:
5,973,583✔
1596
      asyncExecLocalCmd(pRequest, pQuery);
5,973,583✔
1597
      break;
5,973,583✔
1598
    case QUERY_EXEC_MODE_RPC:
116,754,078✔
1599
      code = asyncExecDdlQuery(pRequest, pQuery);
116,754,078✔
1600
      break;
116,752,831✔
1601
    case QUERY_EXEC_MODE_SCHEDULE: {
727,061,706✔
1602
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
727,061,706✔
1603
      break;
727,145,864✔
1604
    }
1605
    case QUERY_EXEC_MODE_EMPTY_RESULT:
598,288✔
1606
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
598,288✔
1607
      doRequestCallback(pRequest, 0);
598,288✔
1608
      break;
598,288✔
1609
    default:
×
1610
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1611
      doRequestCallback(pRequest, -1);
×
1612
      break;
×
1613
  }
1614
}
1615

1616
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
13,422✔
1617
  SCatalog* pCatalog = NULL;
13,422✔
1618
  int32_t   code = 0;
13,422✔
1619
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
13,422✔
1620
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
13,422✔
1621

1622
  if (dbNum <= 0 && tblNum <= 0) {
13,422✔
1623
    return TSDB_CODE_APP_ERROR;
13,278✔
1624
  }
1625

1626
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
144✔
1627
  if (code != TSDB_CODE_SUCCESS) {
144✔
1628
    return code;
×
1629
  }
1630

1631
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
144✔
1632
                           .requestId = pRequest->requestId,
144✔
1633
                           .requestObjRefId = pRequest->self,
144✔
1634
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
144✔
1635

1636
  for (int32_t i = 0; i < dbNum; ++i) {
387✔
1637
    char* dbFName = taosArrayGet(pRequest->dbList, i);
243✔
1638

1639
    // catalogRefreshDBVgInfo will handle dbFName == null.
1640
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
243✔
1641
    if (code != TSDB_CODE_SUCCESS) {
243✔
1642
      return code;
×
1643
    }
1644
  }
1645

1646
  for (int32_t i = 0; i < tblNum; ++i) {
288✔
1647
    SName* tableName = taosArrayGet(pRequest->tableList, i);
144✔
1648

1649
    // catalogRefreshTableMeta will handle tableName == null.
1650
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
144✔
1651
    if (code != TSDB_CODE_SUCCESS) {
144✔
1652
      return code;
×
1653
    }
1654
  }
1655

1656
  return code;
144✔
1657
}
1658

1659
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
5,027,456✔
1660
  SCatalog* pCatalog = NULL;
5,027,456✔
1661
  int32_t   tbNum = taosArrayGetSize(tbList);
5,027,456✔
1662
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
5,027,456✔
1663
  if (code != TSDB_CODE_SUCCESS) {
5,027,456✔
1664
    return code;
×
1665
  }
1666

1667
  if (isView) {
5,027,456✔
1668
    for (int32_t i = 0; i < tbNum; ++i) {
695,524✔
1669
      SName* pViewName = taosArrayGet(tbList, i);
347,762✔
1670
      char   dbFName[TSDB_DB_FNAME_LEN];
344,266✔
1671
      if (NULL == pViewName) {
347,762✔
1672
        continue;
×
1673
      }
1674
      (void)tNameGetFullDbName(pViewName, dbFName);
347,762✔
1675
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
347,762✔
1676
    }
1677
  } else {
1678
    for (int32_t i = 0; i < tbNum; ++i) {
7,405,133✔
1679
      SName* pTbName = taosArrayGet(tbList, i);
2,725,439✔
1680
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
2,725,439✔
1681
    }
1682
  }
1683

1684
  return TSDB_CODE_SUCCESS;
5,027,456✔
1685
}
1686

1687
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
102,545,485✔
1688
  pEpSet->version = 0;
102,545,485✔
1689

1690
  // init mnode ip set
1691
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
102,546,415✔
1692
  mgmtEpSet->numOfEps = 0;
102,547,068✔
1693
  mgmtEpSet->inUse = 0;
102,546,145✔
1694

1695
  if (firstEp && firstEp[0] != 0) {
102,546,811✔
1696
    if (strlen(firstEp) >= TSDB_EP_LEN) {
102,549,431✔
1697
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1698
      return -1;
×
1699
    }
1700

1701
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
102,549,431✔
1702
    if (code != TSDB_CODE_SUCCESS) {
102,545,714✔
1703
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1704
      return terrno;
×
1705
    }
1706
    // uint32_t addr = 0;
1707
    SIpAddr addr = {0};
102,545,714✔
1708
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
102,545,769✔
1709
    if (code) {
102,548,167✔
1710
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
165✔
1711
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1712
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
176✔
1713
    } else {
1714
      mgmtEpSet->numOfEps++;
102,548,771✔
1715
    }
1716
  }
1717

1718
  if (secondEp && secondEp[0] != 0) {
102,546,315✔
1719
    if (strlen(secondEp) >= TSDB_EP_LEN) {
1,883,619✔
1720
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1721
      return terrno;
×
1722
    }
1723

1724
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
1,883,619✔
1725
    if (code != TSDB_CODE_SUCCESS) {
1,884,501✔
1726
      return code;
×
1727
    }
1728
    SIpAddr addr = {0};
1,884,501✔
1729
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
1,884,279✔
1730
    if (code) {
1,883,724✔
1731
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1732
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1733
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1734
    } else {
1735
      mgmtEpSet->numOfEps++;
1,883,724✔
1736
    }
1737
  }
1738

1739
  if (mgmtEpSet->numOfEps == 0) {
102,546,846✔
1740
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
176✔
1741
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
176✔
1742
  }
1743

1744
  return 0;
102,546,118✔
1745
}
1746

1747
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
102,549,922✔
1748
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1749
  *pTscObj = NULL;
102,549,922✔
1750
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
102,549,922✔
1751
  if (TSDB_CODE_SUCCESS != code) {
102,551,138✔
1752
    return code;
×
1753
  }
1754

1755
  SRequestObj* pRequest = NULL;
102,551,138✔
1756
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
102,551,138✔
1757
  if (TSDB_CODE_SUCCESS != code) {
102,550,990✔
1758
    destroyTscObj(*pTscObj);
×
1759
    return code;
×
1760
  }
1761

1762
  pRequest->sqlstr = taosStrdup("taos_connect");
102,550,990✔
1763
  if (pRequest->sqlstr) {
102,550,529✔
1764
    pRequest->sqlLen = strlen(pRequest->sqlstr);
102,549,913✔
1765
  } else {
1766
    return terrno;
×
1767
  }
1768

1769
  SMsgSendInfo* body = NULL;
102,550,529✔
1770
  code = buildConnectMsg(pRequest, &body, totpCode);
102,550,529✔
1771
  if (TSDB_CODE_SUCCESS != code) {
102,543,396✔
1772
    destroyTscObj(*pTscObj);
×
1773
    return code;
×
1774
  }
1775

1776
  // int64_t transporterId = 0;
1777
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
102,543,396✔
1778
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
102,548,439✔
1779
  if (TSDB_CODE_SUCCESS != code) {
102,545,683✔
1780
    destroyTscObj(*pTscObj);
×
1781
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1782
    return code;
×
1783
  }
1784
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
102,545,683✔
1785
    destroyTscObj(*pTscObj);
×
1786
    tscError("failed to wait sem, code:%s", terrstr());
×
1787
    return terrno;
×
1788
  }
1789
  if (pRequest->code != TSDB_CODE_SUCCESS) {
102,548,749✔
1790
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
14,678✔
1791
    tscError("failed to connect to server, reason: %s", errorMsg);
14,678✔
1792

1793
    terrno = pRequest->code;
14,678✔
1794
    destroyRequest(pRequest);
14,678✔
1795
    taos_close_internal(*pTscObj);
14,678✔
1796
    *pTscObj = NULL;
14,678✔
1797
    return terrno;
14,678✔
1798
  }
1799
  if (connType == CONN_TYPE__AUTH_TEST) {
102,534,071✔
1800
    terrno = TSDB_CODE_SUCCESS;
96✔
1801
    destroyRequest(pRequest);
96✔
1802
    taos_close_internal(*pTscObj);
96✔
1803
    *pTscObj = NULL;
2,872✔
1804
    return TSDB_CODE_SUCCESS;
2,872✔
1805
  }
1806

1807
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
102,533,975✔
1808
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1809
  destroyRequest(pRequest);
102,536,555✔
1810
  return code;
102,526,461✔
1811
}
1812

1813
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
102,548,647✔
1814
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
102,548,647✔
1815
  if (*pMsgSendInfo == NULL) {
102,549,864✔
1816
    return terrno;
×
1817
  }
1818

1819
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
102,549,884✔
1820

1821
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
102,549,884✔
1822
  (*pMsgSendInfo)->requestId = pRequest->requestId;
102,549,829✔
1823
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
102,549,864✔
1824
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
102,549,630✔
1825
  if (NULL == (*pMsgSendInfo)->param) {
102,549,690✔
1826
    taosMemoryFree(*pMsgSendInfo);
×
1827
    return terrno;
×
1828
  }
1829

1830
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
102,549,155✔
1831

1832
  SConnectReq connectReq = {0};
102,549,690✔
1833
  STscObj*    pObj = pRequest->pTscObj;
102,549,210✔
1834

1835
  char* db = getDbOfConnection(pObj);
102,549,210✔
1836
  if (db != NULL) {
102,551,361✔
1837
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
570,835✔
1838
  } else if (terrno) {
101,980,526✔
1839
    taosMemoryFree(*pMsgSendInfo);
×
1840
    return terrno;
×
1841
  }
1842
  taosMemoryFreeClear(db);
102,549,516✔
1843

1844
  connectReq.connType = pObj->connType;
102,550,138✔
1845
  connectReq.pid = appInfo.pid;
102,550,138✔
1846
  connectReq.startTime = appInfo.startTime;
102,550,138✔
1847
  connectReq.totpCode = totpCode;
102,550,138✔
1848
  connectReq.connectTime = taosGetTimestampMs();
102,548,863✔
1849

1850
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
102,548,863✔
1851
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
102,548,383✔
1852
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
102,548,348✔
1853
  tstrncpy(connectReq.token, pObj->token, sizeof(connectReq.token));
102,547,863✔
1854
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
102,548,304✔
1855
  tSignConnectReq(&connectReq);
102,547,892✔
1856

1857
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
102,550,075✔
1858
  void*   pReq = taosMemoryMalloc(contLen);
102,544,173✔
1859
  if (NULL == pReq) {
102,549,379✔
1860
    taosMemoryFree(*pMsgSendInfo);
×
1861
    return terrno;
×
1862
  }
1863

1864
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
102,549,379✔
1865
    taosMemoryFree(*pMsgSendInfo);
×
1866
    taosMemoryFree(pReq);
×
1867
    return terrno;
×
1868
  }
1869

1870
  (*pMsgSendInfo)->msgInfo.len = contLen;
102,545,372✔
1871
  (*pMsgSendInfo)->msgInfo.pData = pReq;
102,546,074✔
1872
  return TSDB_CODE_SUCCESS;
102,545,811✔
1873
}
1874

1875
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,045,096,290✔
1876
  if (NULL == pEpSet) {
2,045,096,290✔
1877
    return;
2,041,478,314✔
1878
  }
1879

1880
  switch (pSendInfo->target.type) {
3,618,677✔
1881
    case TARGET_TYPE_MNODE:
148✔
1882
      if (NULL == pTscObj) {
148✔
1883
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1884
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1885
        return;
978✔
1886
      }
1887

1888
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
148✔
1889
      SEpSet* pOrig = &originEpset;
148✔
1890
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
148✔
1891
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
148✔
1892
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
148✔
1893
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1894
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
148✔
1895
      break;
664,311✔
1896
    case TARGET_TYPE_VNODE: {
3,391,286✔
1897
      if (NULL == pTscObj) {
3,391,286✔
1898
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
978✔
1899
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1900
        return;
978✔
1901
      }
1902

1903
      SCatalog* pCatalog = NULL;
3,390,308✔
1904
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
3,390,308✔
1905
      if (code != TSDB_CODE_SUCCESS) {
3,390,200✔
1906
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1907
                 tstrerror(code));
1908
        return;
×
1909
      }
1910

1911
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
3,390,200✔
1912
      if (code != TSDB_CODE_SUCCESS) {
3,390,415✔
1913
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
192✔
1914
                 tstrerror(code));
1915
        return;
×
1916
      }
1917
      taosMemoryFreeClear(pSendInfo->target.dbFName);
3,390,223✔
1918
      break;
3,390,298✔
1919
    }
1920
    default:
236,976✔
1921
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
236,976✔
1922
      break;
237,792✔
1923
  }
1924
}
1925

1926
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
2,046,402,014✔
1927
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
2,046,402,014✔
1928
  if (pMsg->info.ahandle == NULL) {
2,046,411,787✔
1929
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
1,045,931✔
1930
    rpcFreeCont(pMsg->pCont);
1,045,931✔
1931
    taosMemoryFree(pEpSet);
1,045,931✔
1932
    return TSDB_CODE_TSC_INTERNAL_ERROR;
1,045,931✔
1933
  }
1934

1935
  STscObj* pTscObj = NULL;
2,045,364,560✔
1936

1937
  STraceId* trace = &pMsg->info.traceId;
2,045,364,560✔
1938
  char      tbuf[40] = {0};
2,045,364,676✔
1939
  TRACE_TO_STR(trace, tbuf);
2,045,363,654✔
1940

1941
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
2,045,378,205✔
1942
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1943

1944
  if (pSendInfo->requestObjRefId != 0) {
2,045,382,399✔
1945
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,736,317,443✔
1946
    if (pRequest) {
1,736,327,219✔
1947
      if (pRequest->self != pSendInfo->requestObjRefId) {
1,718,768,588✔
1948
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1949
                 pSendInfo->requestObjRefId);
1950

1951
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1952
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1953
        }
1954
        rpcFreeCont(pMsg->pCont);
×
1955
        taosMemoryFree(pEpSet);
×
1956
        destroySendMsgInfo(pSendInfo);
×
1957
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1958
      }
1959
      pTscObj = pRequest->pTscObj;
1,718,767,539✔
1960
    }
1961
  }
1962

1963
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
2,045,383,935✔
1964

1965
  SDataBuf buf = {.msgType = pMsg->msgType,
2,045,102,740✔
1966
                  .len = pMsg->contLen,
2,045,105,491✔
1967
                  .pData = NULL,
1968
                  .handle = pMsg->info.handle,
2,045,112,374✔
1969
                  .handleRefId = pMsg->info.refId,
2,045,113,460✔
1970
                  .pEpSet = pEpSet};
1971

1972
  if (pMsg->contLen > 0) {
2,045,113,399✔
1973
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
1,999,521,552✔
1974
    if (buf.pData == NULL) {
1,999,530,973✔
1975
      pMsg->code = terrno;
×
1976
    } else {
1977
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
1,999,530,973✔
1978
    }
1979
  }
1980

1981
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
2,045,155,626✔
1982

1983
  if (pTscObj) {
2,045,289,106✔
1984
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,718,714,231✔
1985
    if (TSDB_CODE_SUCCESS != code) {
1,718,785,675✔
1986
      tscError("doProcessMsgFromServer taosReleaseRef failed");
741✔
1987
      terrno = code;
741✔
1988
      pMsg->code = code;
741✔
1989
    }
1990
  }
1991

1992
  rpcFreeCont(pMsg->pCont);
2,045,360,550✔
1993
  destroySendMsgInfo(pSendInfo);
2,045,323,200✔
1994
  return TSDB_CODE_SUCCESS;
2,045,347,479✔
1995
}
1996

1997
int32_t doProcessMsgFromServer(void* param) {
2,046,440,495✔
1998
  AsyncArg* arg = (AsyncArg*)param;
2,046,440,495✔
1999
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
2,046,440,495✔
2000
  taosMemoryFree(arg);
2,046,392,519✔
2001
  return code;
2,046,398,181✔
2002
}
2003

2004
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,046,004,670✔
2005
  int32_t code = 0;
2,046,004,670✔
2006
  SEpSet* tEpSet = NULL;
2,046,004,670✔
2007

2008
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
2,046,004,670✔
2009

2010
  if (pEpSet != NULL) {
2,046,033,971✔
2011
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
3,629,319✔
2012
    if (NULL == tEpSet) {
3,629,106✔
2013
      code = terrno;
×
2014
      pMsg->code = terrno;
×
2015
      goto _exit;
×
2016
    }
2017
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
3,629,106✔
2018
  }
2019

2020
  // pMsg is response msg
2021
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
2,046,033,758✔
2022
    // restore origin code
2023
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
102,469,011✔
2024
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
2025
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
102,468,954✔
2026
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
2027
    }
2028
  } else {
2029
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
2030
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
1,943,564,021✔
2031
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
2032
    }
2033
  }
2034

2035
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
2,046,045,789✔
2036
  if (NULL == arg) {
2,045,983,505✔
2037
    code = terrno;
×
2038
    pMsg->code = code;
×
2039
    goto _exit;
×
2040
  }
2041

2042
  arg->msg = *pMsg;
2,045,983,505✔
2043
  arg->pEpset = tEpSet;
2,045,994,777✔
2044

2045
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
2,046,044,140✔
2046
    pMsg->code = code;
×
2047
    taosMemoryFree(arg);
×
2048
    goto _exit;
×
2049
  }
2050
  return;
2,046,228,715✔
2051

2052
_exit:
×
2053
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
2054
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
2055
  if (code != 0) {
×
2056
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2057
  }
2058
}
2059

2060
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
2,277✔
2061
                        uint16_t port) {
2062
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
2,277✔
2063
  if (user == NULL) {
2,277✔
2064
    user = TSDB_DEFAULT_USER;
×
2065
  }
2066

2067
  if (pass == NULL) {
2,277✔
2068
    pass = TSDB_DEFAULT_PASS;
×
2069
  }
2070

2071
  STscObj* pObj = NULL;
2,277✔
2072
  int32_t  code = taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__QUERY, &pObj);
2,277✔
2073
  if (TSDB_CODE_SUCCESS == code) {
2,277✔
2074
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,512✔
2075
    if (NULL == rid) {
1,512✔
2076
      tscError("out of memory when taos_connect_totp to %s:%u, user:%s db:%s", ip, port, user, db);
×
2077
      return NULL;
×
2078
    }
2079
    *rid = pObj->id;
1,512✔
2080
    return (TAOS*)rid;
1,512✔
2081
  } else {
2082
    terrno = code;
765✔
2083
  }
2084

2085
  return NULL;
765✔
2086
}
2087

2088
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
192✔
2089
                      uint16_t port) {
2090
  tscInfo("try to test connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
192✔
2091
  if (user == NULL) {
192✔
2092
    user = TSDB_DEFAULT_USER;
×
2093
  }
2094

2095
  if (pass == NULL) {
192✔
2096
    pass = TSDB_DEFAULT_PASS;
×
2097
  }
2098

2099
  STscObj* pObj = NULL;
192✔
2100
  return taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
192✔
2101
}
2102

2103
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) {
2,598✔
2104
  tscInfo("try to connect to %s:%u by token, db:%s", ip, port, db);
2,598✔
2105

2106
  STscObj* pObj = NULL;
2,598✔
2107
  int32_t  code = taos_connect_by_auth(ip, NULL, token, NULL, db, port, CONN_TYPE__QUERY, &pObj);
2,598✔
2108
  if (TSDB_CODE_SUCCESS == code) {
2,598✔
2109
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,259✔
2110
    if (NULL == rid) {
1,259✔
2111
      tscError("out of memory when taos_connect_token to %s:%u db:%s", ip, port, db);
×
2112
      return NULL;
×
2113
    }
2114
    *rid = pObj->id;
1,259✔
2115
    return (TAOS*)rid;
1,259✔
2116
  } else {
2117
    terrno = code;
1,339✔
2118
  }
2119

2120
  return NULL;
1,339✔
2121
}
2122

2123
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
129✔
2124
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
129✔
2125
  if (user == NULL) {
129✔
2126
    user = TSDB_DEFAULT_USER;
×
2127
  }
2128

2129
  if (auth == NULL) {
129✔
2130
    tscError("No auth info is given, failed to connect to server");
×
2131
    return NULL;
×
2132
  }
2133

2134
  STscObj* pObj = NULL;
129✔
2135
  int32_t  code = taos_connect_by_auth(ip, user, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
129✔
2136
  if (TSDB_CODE_SUCCESS == code) {
129✔
UNCOV
2137
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
×
UNCOV
2138
    if (NULL == rid) {
×
2139
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2140
    }
UNCOV
2141
    *rid = pObj->id;
×
UNCOV
2142
    return (TAOS*)rid;
×
2143
  }
2144

2145
  return NULL;
129✔
2146
}
2147

2148
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2149
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2150
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2151

2152
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2153
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2154

2155
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2156
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2157
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2158

2159
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
2160
          pResultInfo->length[i] = blobDataLen(pStart);
1,484✔
2161
          pResultInfo->row[i] = blobDataVal(pStart);
105✔
2162
        } else {
2163
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2164
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2165
        }
2166
      } else {
2167
        pResultInfo->row[i] = NULL;
273,570,260✔
2168
        pResultInfo->length[i] = 0;
273,622,744✔
2169
      }
2170
    } else {
2171
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2172
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2173
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2174
      } else {
2175
        pResultInfo->row[i] = NULL;
1,166,271,997✔
2176
        pResultInfo->length[i] = 0;
1,167,536,248✔
2177
      }
2178
    }
2179
  }
2180
}
2,147,483,647✔
2181

2182
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2183
  if (pRequest == NULL) {
×
2184
    return NULL;
×
2185
  }
2186

2187
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2188
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2189
    // All data has returned to App already, no need to try again
2190
    if (pResultInfo->completed) {
×
2191
      pResultInfo->numOfRows = 0;
×
2192
      return NULL;
×
2193
    }
2194

2195
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2196
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2197

2198
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2199
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2200
      pResultInfo->numOfRows = 0;
×
2201
      return NULL;
×
2202
    }
2203

2204
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2205
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2206
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2207
      pResultInfo->numOfRows = 0;
×
2208
      return NULL;
×
2209
    }
2210

2211
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2212
             ", complete:%d, QID:0x%" PRIx64,
2213
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2214

2215
    STscObj*            pTscObj = pRequest->pTscObj;
×
2216
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2217
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2218

2219
    if (pResultInfo->numOfRows == 0) {
×
2220
      return NULL;
×
2221
    }
2222
  }
2223

2224
  if (setupOneRowPtr) {
×
2225
    doSetOneRowPtr(pResultInfo);
×
2226
    pResultInfo->current += 1;
×
2227
  }
2228

2229
  return pResultInfo->row;
×
2230
}
2231

2232
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
229,221,792✔
2233
  tsem_t* sem = param;
229,221,792✔
2234
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
229,221,792✔
2235
    tscError("failed to post sem, code:%s", terrstr());
×
2236
  }
2237
}
229,221,372✔
2238

2239
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,654,431,860✔
2240
  if (pRequest == NULL) {
1,654,431,860✔
2241
    return NULL;
×
2242
  }
2243

2244
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,654,431,860✔
2245
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,654,451,319✔
2246
    // All data has returned to App already, no need to try again
2247
    if (pResultInfo->completed) {
322,720,365✔
2248
      pResultInfo->numOfRows = 0;
93,505,522✔
2249
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
187,010,966✔
2250
      return NULL;
93,505,522✔
2251
    }
2252

2253
    // convert ucs4 to native multi-bytes string
2254
    pResultInfo->convertUcs4 = convertUcs4;
229,221,838✔
2255
    tsem_t sem;
228,472,582✔
2256
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
229,221,873✔
2257
      tscError("failed to init sem, code:%s", terrstr());
×
2258
    }
2259
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
229,221,762✔
2260
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
229,221,888✔
2261
      tscError("failed to wait sem, code:%s", terrstr());
×
2262
    }
2263
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
229,221,888✔
2264
      tscError("failed to destroy sem, code:%s", terrstr());
×
2265
    }
2266
    pRequest->inCallback = false;
229,221,810✔
2267
  }
2268

2269
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,560,957,056✔
2270
    return NULL;
10,997,946✔
2271
  } else {
2272
    if (setupOneRowPtr) {
1,549,959,093✔
2273
      doSetOneRowPtr(pResultInfo);
1,330,525,648✔
2274
      pResultInfo->current += 1;
1,330,525,337✔
2275
    }
2276

2277
    return pResultInfo->row;
1,549,959,835✔
2278
  }
2279
}
2280

2281
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
299,262,682✔
2282
  if (pResInfo->row == NULL) {
299,262,682✔
2283
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
170,710,753✔
2284
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
170,716,521✔
2285
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
170,710,508✔
2286
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
170,714,474✔
2287

2288
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
170,716,948✔
2289
      taosMemoryFree(pResInfo->row);
3,717✔
2290
      taosMemoryFree(pResInfo->pCol);
×
2291
      taosMemoryFree(pResInfo->length);
×
2292
      taosMemoryFree(pResInfo->convertBuf);
×
2293
      return terrno;
×
2294
    }
2295
  }
2296

2297
  return TSDB_CODE_SUCCESS;
299,271,156✔
2298
}
2299

2300
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
299,178,992✔
2301
  int32_t idx = -1;
299,178,992✔
2302
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
299,181,582✔
2303
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
299,170,041✔
2304

2305
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,650,063,427✔
2306
    int32_t type = pResultInfo->fields[i].type;
1,350,911,617✔
2307
    int32_t schemaBytes =
2308
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,350,914,695✔
2309

2310
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1,350,910,914✔
2311
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
85,610,376✔
2312
      if (p == NULL) {
85,610,376✔
2313
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2314
        return terrno;
×
2315
      }
2316

2317
      pResultInfo->convertBuf[i] = p;
85,610,376✔
2318

2319
      SResultColumn* pCol = &pResultInfo->pCol[i];
85,610,376✔
2320
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2321
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2322
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2323

2324
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2325
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2326
            tscError(
64✔
2327
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2328
                "colLength[i]):%p",
2329
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2330
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
64✔
2331
            return TSDB_CODE_TSC_INTERNAL_ERROR;
64✔
2332
          }
2333

2334
          varDataSetLen(p, len);
2,147,483,647✔
2335
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2336
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2337
        }
2338
      }
2339

2340
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
85,610,312✔
2341
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
85,610,312✔
2342
    }
2343
  }
2344
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
299,176,731✔
2345
  return TSDB_CODE_SUCCESS;
299,181,929✔
2346
}
2347

2348
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
299,176,915✔
2349
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,650,086,819✔
2350
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
1,350,913,733✔
2351
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
1,350,910,702✔
2352
    int32_t       type = pFieldE->type;
1,350,913,802✔
2353
    int32_t       bufLen = 0;
1,350,908,086✔
2354
    char*         p = NULL;
1,350,908,086✔
2355
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
1,350,908,086✔
2356
      continue;
1,349,334,268✔
2357
    } else {
2358
      bufLen = 64;
1,581,118✔
2359
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,581,118✔
2360
      pFieldE->bytes = bufLen;
1,581,118✔
2361
      pField->bytes = bufLen;
1,581,118✔
2362
    }
2363
    if (!p) return terrno;
1,581,118✔
2364
    pResultInfo->convertBuf[i] = p;
1,581,118✔
2365

2366
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,014,550,129✔
2367
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
1,012,969,011✔
2368
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
1,012,969,011✔
2369
      p += bufLen;
1,012,969,011✔
2370
      if (TSDB_CODE_SUCCESS != code) {
1,012,969,011✔
2371
        return code;
×
2372
      }
2373
    }
2374
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,581,118✔
2375
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,581,118✔
2376
  }
2377
  return 0;
299,177,142✔
2378
}
2379

2380
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
389,794✔
2381
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
779,004✔
2382
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
389,210✔
2383
}
2384

2385
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
194,897✔
2386
  char*   p = (char*)pResultInfo->pData;
194,897✔
2387
  int32_t blockVersion = *(int32_t*)p;
194,897✔
2388

2389
  int32_t numOfRows = pResultInfo->numOfRows;
194,897✔
2390
  int32_t numOfCols = pResultInfo->numOfCols;
194,897✔
2391

2392
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2393
  // length |
2394
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
194,897✔
2395
  if (numOfCols != cols) {
194,897✔
2396
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2397
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2398
  }
2399

2400
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
194,897✔
2401
  int32_t* colLength = (int32_t*)(p + len);
194,897✔
2402
  len += sizeof(int32_t) * numOfCols;
194,897✔
2403

2404
  char* pStart = p + len;
194,897✔
2405
  for (int32_t i = 0; i < numOfCols; ++i) {
832,113✔
2406
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
637,216✔
2407

2408
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
637,216✔
2409
      int32_t* offset = (int32_t*)pStart;
226,964✔
2410
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
226,964✔
2411
      len += lenTmp;
226,964✔
2412
      pStart += lenTmp;
226,964✔
2413

2414
      int32_t estimateColLen = 0;
226,964✔
2415
      for (int32_t j = 0; j < numOfRows; ++j) {
1,099,296✔
2416
        if (offset[j] == -1) {
872,332✔
2417
          continue;
42,580✔
2418
        }
2419
        char* data = offset[j] + pStart;
829,752✔
2420

2421
        int32_t jsonInnerType = *data;
829,752✔
2422
        char*   jsonInnerData = data + CHAR_BYTES;
829,752✔
2423
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
829,752✔
2424
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
11,568✔
2425
        } else if (tTagIsJson(data)) {
818,184✔
2426
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
198,468✔
2427
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
619,716✔
2428
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
576,336✔
2429
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
43,380✔
2430
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
31,812✔
2431
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
11,568✔
2432
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
11,568✔
2433
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
2434
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2435
        } else {
2436
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2437
          return -1;
×
2438
        }
2439
      }
2440
      len += TMAX(colLen, estimateColLen);
226,964✔
2441
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
410,252✔
2442
      int32_t lenTmp = numOfRows * sizeof(int32_t);
57,840✔
2443
      len += (lenTmp + colLen);
57,840✔
2444
      pStart += lenTmp;
57,840✔
2445
    } else {
2446
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
352,412✔
2447
      len += (lenTmp + colLen);
352,412✔
2448
      pStart += lenTmp;
352,412✔
2449
    }
2450
    pStart += colLen;
637,216✔
2451
  }
2452

2453
  // Ensure the complete structure of the block, including the blankfill field,
2454
  // even though it is not used on the client side.
2455
  len += sizeof(bool);
194,897✔
2456
  return len;
194,897✔
2457
}
2458

2459
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
299,273,241✔
2460
  int32_t numOfRows = pResultInfo->numOfRows;
299,273,241✔
2461
  int32_t numOfCols = pResultInfo->numOfCols;
299,275,537✔
2462
  bool    needConvert = false;
299,281,889✔
2463
  for (int32_t i = 0; i < numOfCols; ++i) {
1,650,216,766✔
2464
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
1,351,132,603✔
2465
      needConvert = true;
194,897✔
2466
      break;
194,897✔
2467
    }
2468
  }
2469

2470
  if (!needConvert) {
299,279,060✔
2471
    return TSDB_CODE_SUCCESS;
299,084,163✔
2472
  }
2473

2474
  tscDebug("start to convert form json format string");
194,897✔
2475

2476
  char*   p = (char*)pResultInfo->pData;
194,897✔
2477
  int32_t blockVersion = *(int32_t*)p;
194,897✔
2478
  int32_t dataLen = estimateJsonLen(pResultInfo);
194,897✔
2479
  if (dataLen <= 0) {
194,897✔
2480
    tscError("doConvertJson error: estimateJsonLen failed");
×
2481
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2482
  }
2483

2484
  taosMemoryFreeClear(pResultInfo->convertJson);
194,897✔
2485
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
194,897✔
2486
  if (pResultInfo->convertJson == NULL) return terrno;
194,897✔
2487
  char* p1 = pResultInfo->convertJson;
194,897✔
2488

2489
  int32_t totalLen = 0;
194,897✔
2490
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
194,897✔
2491
  if (numOfCols != cols) {
194,897✔
2492
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2493
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2494
  }
2495

2496
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
194,897✔
2497
  (void)memcpy(p1, p, len);
194,897✔
2498

2499
  p += len;
194,897✔
2500
  p1 += len;
194,897✔
2501
  totalLen += len;
194,897✔
2502

2503
  len = sizeof(int32_t) * numOfCols;
194,897✔
2504
  int32_t* colLength = (int32_t*)p;
194,897✔
2505
  int32_t* colLength1 = (int32_t*)p1;
194,897✔
2506
  (void)memcpy(p1, p, len);
194,897✔
2507
  p += len;
194,897✔
2508
  p1 += len;
194,897✔
2509
  totalLen += len;
194,897✔
2510

2511
  char* pStart = p;
194,897✔
2512
  char* pStart1 = p1;
194,897✔
2513
  for (int32_t i = 0; i < numOfCols; ++i) {
832,113✔
2514
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
637,216✔
2515
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
637,216✔
2516
    if (colLen >= dataLen) {
637,216✔
2517
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2518
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2519
    }
2520
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
637,216✔
2521
      int32_t* offset = (int32_t*)pStart;
226,964✔
2522
      int32_t* offset1 = (int32_t*)pStart1;
226,964✔
2523
      len = numOfRows * sizeof(int32_t);
226,964✔
2524
      (void)memcpy(pStart1, pStart, len);
226,964✔
2525
      pStart += len;
226,964✔
2526
      pStart1 += len;
226,964✔
2527
      totalLen += len;
226,964✔
2528

2529
      len = 0;
226,964✔
2530
      for (int32_t j = 0; j < numOfRows; ++j) {
1,099,296✔
2531
        if (offset[j] == -1) {
872,332✔
2532
          continue;
42,580✔
2533
        }
2534
        char* data = offset[j] + pStart;
829,752✔
2535

2536
        int32_t jsonInnerType = *data;
829,752✔
2537
        char*   jsonInnerData = data + CHAR_BYTES;
829,752✔
2538
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
829,752✔
2539
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
829,752✔
2540
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
11,568✔
2541
          varDataSetLen(dst, strlen(varDataVal(dst)));
11,568✔
2542
        } else if (tTagIsJson(data)) {
818,184✔
2543
          char* jsonString = NULL;
198,468✔
2544
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
198,468✔
2545
          if (jsonString == NULL) {
198,468✔
2546
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2547
            return terrno;
×
2548
          }
2549
          STR_TO_VARSTR(dst, jsonString);
198,468✔
2550
          taosMemoryFree(jsonString);
198,468✔
2551
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
619,716✔
2552
          *(char*)varDataVal(dst) = '\"';
576,336✔
2553
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
576,336✔
2554
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
576,336✔
2555
                                         pResultInfo->charsetCxt);
2556
          if (length <= 0) {
576,336✔
2557
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
482✔
2558
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2559
            length = 0;
482✔
2560
          }
2561
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
576,336✔
2562
                                                 varDataVal(tmp), length);
2563
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
576,336✔
2564
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
576,336✔
2565
          tscError("value:%s.", varDataVal(dst));
576,336✔
2566
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
43,380✔
2567
          double jsonVd = *(double*)(jsonInnerData);
31,812✔
2568
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
31,812✔
2569
          varDataSetLen(dst, strlen(varDataVal(dst)));
31,812✔
2570
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
11,568✔
2571
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
11,568✔
2572
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
11,568✔
2573
          varDataSetLen(dst, strlen(varDataVal(dst)));
11,568✔
2574
        } else {
2575
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2576
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2577
        }
2578

2579
        offset1[j] = len;
829,752✔
2580
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
829,752✔
2581
        len += varDataTLen(dst);
829,752✔
2582
      }
2583
      colLen1 = len;
226,964✔
2584
      totalLen += colLen1;
226,964✔
2585
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
226,964✔
2586
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
410,252✔
2587
      len = numOfRows * sizeof(int32_t);
57,840✔
2588
      (void)memcpy(pStart1, pStart, len);
57,840✔
2589
      pStart += len;
57,840✔
2590
      pStart1 += len;
57,840✔
2591
      totalLen += len;
57,840✔
2592
      totalLen += colLen;
57,840✔
2593
      (void)memcpy(pStart1, pStart, colLen);
57,840✔
2594
    } else {
2595
      len = BitmapLen(pResultInfo->numOfRows);
352,412✔
2596
      (void)memcpy(pStart1, pStart, len);
352,412✔
2597
      pStart += len;
352,412✔
2598
      pStart1 += len;
352,412✔
2599
      totalLen += len;
352,412✔
2600
      totalLen += colLen;
352,412✔
2601
      (void)memcpy(pStart1, pStart, colLen);
352,412✔
2602
    }
2603
    pStart += colLen;
637,216✔
2604
    pStart1 += colLen1;
637,216✔
2605
  }
2606

2607
  // Ensure the complete structure of the block, including the blankfill field,
2608
  // even though it is not used on the client side.
2609
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2610
  totalLen += sizeof(bool);
194,897✔
2611

2612
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
194,897✔
2613
  pResultInfo->pData = pResultInfo->convertJson;
194,897✔
2614
  return TSDB_CODE_SUCCESS;
194,897✔
2615
}
2616

2617
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
323,825,902✔
2618
  bool convertForDecimal = convertUcs4;
323,825,902✔
2619
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
323,825,902✔
2620
    tscError("setResultDataPtr paras error");
77✔
2621
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2622
  }
2623

2624
  if (pResultInfo->numOfRows == 0) {
323,833,351✔
2625
    return TSDB_CODE_SUCCESS;
24,559,719✔
2626
  }
2627

2628
  if (pResultInfo->pData == NULL) {
299,273,503✔
2629
    tscError("setResultDataPtr error: pData is NULL");
×
2630
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2631
  }
2632

2633
  int32_t code = doPrepareResPtr(pResultInfo);
299,268,712✔
2634
  if (code != TSDB_CODE_SUCCESS) {
299,272,617✔
2635
    return code;
×
2636
  }
2637
  code = doConvertJson(pResultInfo);
299,272,617✔
2638
  if (code != TSDB_CODE_SUCCESS) {
299,273,634✔
2639
    return code;
×
2640
  }
2641

2642
  char* p = (char*)pResultInfo->pData;
299,273,634✔
2643

2644
  // version:
2645
  int32_t blockVersion = *(int32_t*)p;
299,273,672✔
2646
  p += sizeof(int32_t);
299,274,042✔
2647

2648
  int32_t dataLen = *(int32_t*)p;
299,274,212✔
2649
  p += sizeof(int32_t);
299,274,952✔
2650

2651
  int32_t rows = *(int32_t*)p;
299,275,466✔
2652
  p += sizeof(int32_t);
299,275,484✔
2653

2654
  int32_t cols = *(int32_t*)p;
299,277,705✔
2655
  p += sizeof(int32_t);
299,277,060✔
2656

2657
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
299,276,665✔
UNCOV
2658
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
×
2659
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2660
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2661
  }
2662

2663
  int32_t hasColumnSeg = *(int32_t*)p;
299,277,595✔
2664
  p += sizeof(int32_t);
299,275,613✔
2665

2666
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
299,277,020✔
2667
  p += sizeof(uint64_t);
299,277,020✔
2668

2669
  // check fields
2670
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,650,439,408✔
2671
    int8_t type = *(int8_t*)p;
1,351,169,124✔
2672
    p += sizeof(int8_t);
1,351,166,697✔
2673

2674
    int32_t bytes = *(int32_t*)p;
1,351,167,453✔
2675
    p += sizeof(int32_t);
1,351,169,476✔
2676

2677
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
1,351,167,490✔
2678
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
35,360✔
2679
    }
2680
  }
2681

2682
  int32_t* colLength = (int32_t*)p;
299,273,989✔
2683
  p += sizeof(int32_t) * pResultInfo->numOfCols;
299,273,989✔
2684

2685
  char* pStart = p;
299,272,630✔
2686
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,650,450,695✔
2687
    if ((pStart - pResultInfo->pData) >= dataLen) {
1,351,178,111✔
2688
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2689
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2690
    }
2691
    if (blockVersion == BLOCK_VERSION_1) {
1,351,176,841✔
2692
      colLength[i] = htonl(colLength[i]);
1,127,820,064✔
2693
    }
2694
    if (colLength[i] >= dataLen) {
1,351,176,502✔
2695
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2696
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2697
    }
2698
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
1,351,173,848✔
2699
      tscError("invalid type %d", pResultInfo->fields[i].type);
870✔
2700
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2701
    }
2702
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,351,177,010✔
2703
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
288,995,093✔
2704
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
288,996,399✔
2705
    } else {
2706
      pResultInfo->pCol[i].nullbitmap = pStart;
1,062,175,286✔
2707
      pStart += BitmapLen(pResultInfo->numOfRows);
1,062,179,879✔
2708
    }
2709

2710
    pResultInfo->pCol[i].pData = pStart;
1,351,186,331✔
2711
    pResultInfo->length[i] =
2,147,483,647✔
2712
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2713
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,351,177,042✔
2714

2715
    pStart += colLength[i];
1,351,176,262✔
2716
  }
2717

2718
  p = pStart;
299,279,666✔
2719
  // bool blankFill = *(bool*)p;
2720
  p += sizeof(bool);
299,279,666✔
2721
  int32_t offset = p - pResultInfo->pData;
299,279,628✔
2722
  if (offset > dataLen) {
299,279,574✔
2723
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2724
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2725
  }
2726

2727
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2728
  if (convertUcs4) {
299,279,574✔
2729
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
299,183,102✔
2730
  }
2731
#endif
2732
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
299,278,427✔
2733
    code = convertDecimalType(pResultInfo);
299,182,299✔
2734
  }
2735
  return code;
299,272,623✔
2736
}
2737

2738
char* getDbOfConnection(STscObj* pObj) {
1,163,532,948✔
2739
  terrno = TSDB_CODE_SUCCESS;
1,163,532,948✔
2740
  char* p = NULL;
1,163,546,217✔
2741
  (void)taosThreadMutexLock(&pObj->mutex);
1,163,546,217✔
2742
  size_t len = strlen(pObj->db);
1,163,571,054✔
2743
  if (len > 0) {
1,163,573,930✔
2744
    p = taosStrndup(pObj->db, tListLen(pObj->db));
754,814,790✔
2745
    if (p == NULL) {
754,811,857✔
2746
      tscError("failed to taosStrndup db name");
×
2747
    }
2748
  }
2749

2750
  (void)taosThreadMutexUnlock(&pObj->mutex);
1,163,570,997✔
2751
  return p;
1,163,565,859✔
2752
}
2753

2754
void setConnectionDB(STscObj* pTscObj, const char* db) {
102,124,267✔
2755
  if (db == NULL || pTscObj == NULL) {
102,124,267✔
2756
    tscError("setConnectionDB para is NULL");
×
2757
    return;
×
2758
  }
2759

2760
  (void)taosThreadMutexLock(&pTscObj->mutex);
102,173,316✔
2761
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
102,195,033✔
2762
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
102,195,047✔
2763
}
2764

2765
void resetConnectDB(STscObj* pTscObj) {
×
2766
  if (pTscObj == NULL) {
×
2767
    return;
×
2768
  }
2769

2770
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2771
  pTscObj->db[0] = 0;
×
2772
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2773
}
2774

2775
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
279,775,206✔
2776
                              bool isStmt) {
2777
  if (pResultInfo == NULL || pRsp == NULL) {
279,775,206✔
UNCOV
2778
    tscError("setQueryResultFromRsp paras is null");
×
2779
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2780
  }
2781

2782
  taosMemoryFreeClear(pResultInfo->pRspMsg);
279,775,206✔
2783
  pResultInfo->pRspMsg = (const char*)pRsp;
279,775,149✔
2784
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
279,775,206✔
2785
  pResultInfo->current = 0;
279,775,206✔
2786
  pResultInfo->completed = (pRsp->completed == 1);
279,775,206✔
2787
  pResultInfo->precision = pRsp->precision;
279,775,153✔
2788

2789
  // decompress data if needed
2790
  int32_t payloadLen = htonl(pRsp->payloadLen);
279,774,555✔
2791

2792
  if (pRsp->compressed) {
279,775,065✔
2793
    if (pResultInfo->decompBuf == NULL) {
×
2794
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
2795
      if (pResultInfo->decompBuf == NULL) {
×
2796
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2797
        return terrno;
×
2798
      }
2799
      pResultInfo->decompBufSize = payloadLen;
×
2800
    } else {
2801
      if (pResultInfo->decompBufSize < payloadLen) {
×
2802
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2803
        if (p == NULL) {
×
2804
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2805
          return terrno;
×
2806
        }
2807

2808
        pResultInfo->decompBuf = p;
×
2809
        pResultInfo->decompBufSize = payloadLen;
×
2810
      }
2811
    }
2812
  }
2813

2814
  if (payloadLen > 0) {
279,775,149✔
2815
    int32_t compLen = *(int32_t*)pRsp->data;
255,215,896✔
2816
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
255,215,896✔
2817

2818
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
255,215,896✔
2819

2820
    if (pRsp->compressed && compLen < rawLen) {
255,215,801✔
2821
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
2822
      if (len < 0) {
×
2823
        tscError("tsDecompressString failed");
×
2824
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2825
      }
2826
      if (len != rawLen) {
×
2827
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2828
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2829
      }
2830
      pResultInfo->pData = pResultInfo->decompBuf;
×
2831
      pResultInfo->payloadLen = rawLen;
×
2832
    } else {
2833
      pResultInfo->pData = pStart;
255,215,762✔
2834
      pResultInfo->payloadLen = htonl(pRsp->compLen);
255,214,832✔
2835
      if (pRsp->compLen != pRsp->payloadLen) {
255,215,705✔
2836
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2837
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2838
      }
2839
    }
2840
  }
2841

2842
  // TODO handle the compressed case
2843
  pResultInfo->totalRows += pResultInfo->numOfRows;
279,774,613✔
2844

2845
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
279,775,105✔
2846
  return code;
279,775,115✔
2847
}
2848

2849
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
252✔
2850
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
252✔
2851
  void*              clientRpc = NULL;
252✔
2852
  SServerStatusRsp   statusRsp = {0};
252✔
2853
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
252✔
2854
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
252✔
2855
  SRpcMsg  rpcRsp = {0};
252✔
2856
  SRpcInit rpcInit = {0};
252✔
2857
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
252✔
2858

2859
  rpcInit.label = "CHK";
252✔
2860
  rpcInit.numOfThreads = 1;
252✔
2861
  rpcInit.cfp = NULL;
252✔
2862
  rpcInit.sessions = 16;
252✔
2863
  rpcInit.connType = TAOS_CONN_CLIENT;
252✔
2864
  rpcInit.idleTime = tsShellActivityTimer * 1000;
252✔
2865
  rpcInit.compressSize = tsCompressMsgSize;
252✔
2866
  rpcInit.user = "_dnd";
252✔
2867

2868
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
252✔
2869
  connLimitNum = TMAX(connLimitNum, 10);
252✔
2870
  connLimitNum = TMIN(connLimitNum, 500);
252✔
2871
  rpcInit.connLimitNum = connLimitNum;
252✔
2872
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
252✔
2873
  rpcInit.readTimeout = tsReadTimeout;
252✔
2874
  rpcInit.ipv6 = tsEnableIpv6;
252✔
2875
  rpcInit.enableSSL = tsEnableTLS;
252✔
2876

2877
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
252✔
2878
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
252✔
2879
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
252✔
2880
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
252✔
2881
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
252✔
2882

2883
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
252✔
2884
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2885
    goto _OVER;
×
2886
  }
2887

2888
  clientRpc = rpcOpen(&rpcInit);
252✔
2889
  if (clientRpc == NULL) {
252✔
2890
    code = terrno;
×
2891
    tscError("failed to init server status client since %s", tstrerror(code));
×
2892
    goto _OVER;
×
2893
  }
2894

2895
  if (fqdn == NULL) {
252✔
2896
    fqdn = tsLocalFqdn;
252✔
2897
  }
2898

2899
  if (port == 0) {
252✔
2900
    port = tsServerPort;
252✔
2901
  }
2902

2903
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
252✔
2904
  epSet.eps[0].port = (uint16_t)port;
252✔
2905
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
252✔
2906
  if (TSDB_CODE_SUCCESS != ret) {
252✔
2907
    tscError("failed to send recv since %s", tstrerror(ret));
×
2908
    goto _OVER;
×
2909
  }
2910

2911
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
252✔
2912
    tscError("failed to send server status req since %s", terrstr());
60✔
2913
    goto _OVER;
60✔
2914
  }
2915

2916
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
192✔
2917
    tscError("failed to parse server status rsp since %s", terrstr());
×
2918
    goto _OVER;
×
2919
  }
2920

2921
  code = statusRsp.statusCode;
192✔
2922
  if (details != NULL) {
192✔
2923
    tstrncpy(details, statusRsp.details, maxlen);
192✔
2924
  }
2925

2926
_OVER:
237✔
2927
  if (clientRpc != NULL) {
252✔
2928
    rpcClose(clientRpc);
252✔
2929
  }
2930
  if (rpcRsp.pCont != NULL) {
252✔
2931
    rpcFreeCont(rpcRsp.pCont);
192✔
2932
  }
2933
  return code;
252✔
2934
}
2935

2936
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,194✔
2937
                      int32_t acctId, char* db) {
2938
  SName name = {0};
1,194✔
2939

2940
  if (len1 <= 0) {
1,194✔
2941
    return -1;
×
2942
  }
2943

2944
  const char* dbName = db;
1,194✔
2945
  const char* tbName = NULL;
1,194✔
2946
  int32_t     dbLen = 0;
1,194✔
2947
  int32_t     tbLen = 0;
1,194✔
2948
  if (len2 > 0) {
1,194✔
2949
    dbName = str + pos1;
×
2950
    dbLen = len1;
×
2951
    tbName = str + pos2;
×
2952
    tbLen = len2;
×
2953
  } else {
2954
    dbLen = strlen(db);
1,194✔
2955
    tbName = str + pos1;
1,194✔
2956
    tbLen = len1;
1,194✔
2957
  }
2958

2959
  if (dbLen <= 0 || tbLen <= 0) {
1,194✔
2960
    return -1;
×
2961
  }
2962

2963
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,194✔
2964
    return -1;
×
2965
  }
2966

2967
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,194✔
2968
    return -1;
×
2969
  }
2970

2971
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,194✔
2972
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,194✔
2973

2974
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,194✔
2975
  if (pDb) {
1,194✔
2976
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2977
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2978
    }
2979
  } else {
2980
    STablesReq db;
1,194✔
2981
    db.pTables = taosArrayInit(20, sizeof(SName));
1,194✔
2982
    if (NULL == db.pTables) {
1,194✔
2983
      return terrno;
×
2984
    }
2985
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,194✔
2986
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,388✔
2987
      return terrno;
×
2988
    }
2989
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,194✔
2990
  }
2991

2992
  return TSDB_CODE_SUCCESS;
1,194✔
2993
}
2994

2995
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,194✔
2996
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,194✔
2997
  if (NULL == pHash) {
1,194✔
2998
    return terrno;
×
2999
  }
3000

3001
  bool    inEscape = false;
1,194✔
3002
  int32_t code = 0;
1,194✔
3003
  void*   pIter = NULL;
1,194✔
3004

3005
  int32_t vIdx = 0;
1,194✔
3006
  int32_t vPos[2];
1,194✔
3007
  int32_t vLen[2];
1,194✔
3008

3009
  (void)memset(vPos, -1, sizeof(vPos));
1,194✔
3010
  (void)memset(vLen, 0, sizeof(vLen));
1,194✔
3011

3012
  for (int32_t i = 0;; ++i) {
5,970✔
3013
    if (0 == *(tbList + i)) {
5,970✔
3014
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,194✔
3015
        vLen[vIdx] = i - vPos[vIdx];
1,194✔
3016
      }
3017

3018
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,194✔
3019
      if (code) {
1,194✔
3020
        goto _return;
×
3021
      }
3022

3023
      break;
1,194✔
3024
    }
3025

3026
    if ('`' == *(tbList + i)) {
4,776✔
3027
      inEscape = !inEscape;
×
3028
      if (!inEscape) {
×
3029
        if (vPos[vIdx] >= 0) {
×
3030
          vLen[vIdx] = i - vPos[vIdx];
×
3031
        } else {
3032
          goto _return;
×
3033
        }
3034
      }
3035

3036
      continue;
×
3037
    }
3038

3039
    if (inEscape) {
4,776✔
3040
      if (vPos[vIdx] < 0) {
×
3041
        vPos[vIdx] = i;
×
3042
      }
3043
      continue;
×
3044
    }
3045

3046
    if ('.' == *(tbList + i)) {
4,776✔
3047
      if (vPos[vIdx] < 0) {
×
3048
        goto _return;
×
3049
      }
3050
      if (vLen[vIdx] <= 0) {
×
3051
        vLen[vIdx] = i - vPos[vIdx];
×
3052
      }
3053
      vIdx++;
×
3054
      if (vIdx >= 2) {
×
3055
        goto _return;
×
3056
      }
3057
      continue;
×
3058
    }
3059

3060
    if (',' == *(tbList + i)) {
4,776✔
3061
      if (vPos[vIdx] < 0) {
×
3062
        goto _return;
×
3063
      }
3064
      if (vLen[vIdx] <= 0) {
×
3065
        vLen[vIdx] = i - vPos[vIdx];
×
3066
      }
3067

3068
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3069
      if (code) {
×
3070
        goto _return;
×
3071
      }
3072

3073
      (void)memset(vPos, -1, sizeof(vPos));
×
3074
      (void)memset(vLen, 0, sizeof(vLen));
×
3075
      vIdx = 0;
×
3076
      continue;
×
3077
    }
3078

3079
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
4,776✔
3080
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3081
        vLen[vIdx] = i - vPos[vIdx];
×
3082
      }
3083
      continue;
×
3084
    }
3085

3086
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
4,776✔
3087
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
597✔
3088
      if (vLen[vIdx] > 0) {
4,776✔
3089
        goto _return;
×
3090
      }
3091
      if (vPos[vIdx] < 0) {
4,776✔
3092
        vPos[vIdx] = i;
1,194✔
3093
      }
3094
      continue;
4,776✔
3095
    }
3096

3097
    goto _return;
×
3098
  }
3099

3100
  int32_t dbNum = taosHashGetSize(pHash);
1,194✔
3101
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,194✔
3102
  if (NULL == pReq) {
1,194✔
3103
    TSC_ERR_JRET(terrno);
×
3104
  }
3105
  pIter = taosHashIterate(pHash, NULL);
1,194✔
3106
  while (pIter) {
2,388✔
3107
    STablesReq* pDb = (STablesReq*)pIter;
1,194✔
3108
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,388✔
3109
      TSC_ERR_JRET(terrno);
×
3110
    }
3111
    pIter = taosHashIterate(pHash, pIter);
1,194✔
3112
  }
3113

3114
  taosHashCleanup(pHash);
1,194✔
3115

3116
  return TSDB_CODE_SUCCESS;
1,194✔
3117

3118
_return:
×
3119

3120
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3121

3122
  pIter = taosHashIterate(pHash, NULL);
×
3123
  while (pIter) {
×
3124
    STablesReq* pDb = (STablesReq*)pIter;
×
3125
    taosArrayDestroy(pDb->pTables);
×
3126
    pIter = taosHashIterate(pHash, pIter);
×
3127
  }
3128

3129
  taosHashCleanup(pHash);
×
3130

3131
  return terrno;
×
3132
}
3133

3134
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,194✔
3135
  SSyncQueryParam* pParam = param;
1,194✔
3136
  pParam->pRequest->code = code;
1,194✔
3137

3138
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,194✔
3139
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3140
  }
3141
}
1,194✔
3142

3143
void syncQueryFn(void* param, void* res, int32_t code) {
950,088,399✔
3144
  SSyncQueryParam* pParam = param;
950,088,399✔
3145
  pParam->pRequest = res;
950,088,399✔
3146

3147
  if (pParam->pRequest) {
950,094,269✔
3148
    pParam->pRequest->code = code;
950,087,989✔
3149
    clientOperateReport(pParam->pRequest);
950,096,334✔
3150
  }
3151

3152
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
950,012,918✔
3153
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3154
  }
3155
}
950,142,163✔
3156

3157
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
949,755,015✔
3158
                        int8_t source) {
3159
  if (sql == NULL || NULL == fp) {
949,755,015✔
3160
    terrno = TSDB_CODE_INVALID_PARA;
2,565✔
3161
    if (fp) {
×
3162
      fp(param, NULL, terrno);
×
3163
    }
3164

3165
    return;
192✔
3166
  }
3167

3168
  size_t sqlLen = strlen(sql);
949,759,391✔
3169
  if (sqlLen > (size_t)tsMaxSQLLength) {
949,759,391✔
3170
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
576✔
3171
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
576✔
3172
    fp(param, NULL, terrno);
576✔
3173
    return;
576✔
3174
  }
3175

3176
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
949,758,815✔
3177

3178
  SRequestObj* pRequest = NULL;
949,758,295✔
3179
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
949,761,987✔
3180
  if (code != TSDB_CODE_SUCCESS) {
949,751,218✔
3181
    terrno = code;
1,563✔
3182
    fp(param, NULL, terrno);
1,563✔
3183
    return;
1,563✔
3184
  }
3185

3186
  code = connCheckAndUpateMetric(connId);
949,749,655✔
3187
  if (code != TSDB_CODE_SUCCESS) {
949,743,256✔
3188
    terrno = code;
490✔
3189
    fp(param, NULL, terrno);
490✔
3190
    return;
490✔
3191
  }
3192

3193
  pRequest->source = source;
949,742,766✔
3194
  pRequest->body.queryFp = fp;
949,744,649✔
3195
  doAsyncQuery(pRequest, false);
949,748,843✔
3196
}
3197

3198
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
241✔
3199
                                 int64_t reqid) {
3200
  if (sql == NULL || NULL == fp) {
241✔
3201
    terrno = TSDB_CODE_INVALID_PARA;
×
3202
    if (fp) {
×
3203
      fp(param, NULL, terrno);
×
3204
    }
3205

3206
    return;
×
3207
  }
3208

3209
  size_t sqlLen = strlen(sql);
241✔
3210
  if (sqlLen > (size_t)tsMaxSQLLength) {
241✔
3211
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
3212
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3213
    fp(param, NULL, terrno);
×
3214
    return;
×
3215
  }
3216

3217
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
241✔
3218

3219
  SRequestObj* pRequest = NULL;
241✔
3220
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
241✔
3221
  if (code != TSDB_CODE_SUCCESS) {
241✔
3222
    terrno = code;
×
3223
    fp(param, NULL, terrno);
×
3224
    return;
×
3225
  }
3226

3227
  code = connCheckAndUpateMetric(connId);
241✔
3228

3229
  if (code != TSDB_CODE_SUCCESS) {
241✔
3230
    terrno = code;
×
3231
    fp(param, NULL, terrno);
×
3232
    return;
×
3233
  }
3234

3235
  pRequest->body.queryFp = fp;
241✔
3236

3237
  doAsyncQuery(pRequest, false);
241✔
3238
}
3239

3240
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
949,623,472✔
3241
  if (NULL == taos) {
949,623,472✔
3242
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3243
    return NULL;
×
3244
  }
3245

3246
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
949,623,472✔
3247
  if (NULL == param) {
949,629,498✔
3248
    return NULL;
×
3249
  }
3250

3251
  int32_t code = tsem_init(&param->sem, 0, 0);
949,629,498✔
3252
  if (TSDB_CODE_SUCCESS != code) {
949,622,109✔
3253
    taosMemoryFree(param);
×
3254
    return NULL;
×
3255
  }
3256

3257
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
949,622,109✔
3258
  code = tsem_wait(&param->sem);
949,551,091✔
3259
  if (TSDB_CODE_SUCCESS != code) {
949,599,597✔
3260
    taosMemoryFree(param);
×
3261
    return NULL;
×
3262
  }
3263
  code = tsem_destroy(&param->sem);
949,599,597✔
3264
  if (TSDB_CODE_SUCCESS != code) {
949,593,545✔
3265
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3266
  }
3267

3268
  SRequestObj* pRequest = NULL;
949,589,501✔
3269
  if (param->pRequest != NULL) {
949,589,501✔
3270
    param->pRequest->syncQuery = true;
949,587,031✔
3271
    pRequest = param->pRequest;
949,587,634✔
3272
    param->pRequest->inCallback = false;
949,586,761✔
3273
  }
3274
  taosMemoryFree(param);
949,591,537✔
3275

3276
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3277
  //          *(int64_t*)taos, pRequest);
3278

3279
  return pRequest;
949,614,281✔
3280
}
3281

3282
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
241✔
3283
  if (NULL == taos) {
241✔
3284
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3285
    return NULL;
×
3286
  }
3287

3288
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
241✔
3289
  if (param == NULL) {
241✔
3290
    return NULL;
×
3291
  }
3292
  int32_t code = tsem_init(&param->sem, 0, 0);
241✔
3293
  if (TSDB_CODE_SUCCESS != code) {
241✔
3294
    taosMemoryFree(param);
×
3295
    return NULL;
×
3296
  }
3297

3298
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
241✔
3299
  code = tsem_wait(&param->sem);
241✔
3300
  if (TSDB_CODE_SUCCESS != code) {
241✔
3301
    taosMemoryFree(param);
×
3302
    return NULL;
×
3303
  }
3304
  SRequestObj* pRequest = NULL;
241✔
3305
  if (param->pRequest != NULL) {
241✔
3306
    param->pRequest->syncQuery = true;
241✔
3307
    pRequest = param->pRequest;
241✔
3308
  }
3309
  taosMemoryFree(param);
241✔
3310

3311
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3312
  //   *(int64_t*)taos, pRequest);
3313

3314
  return pRequest;
241✔
3315
}
3316

3317
static void fetchCallback(void* pResult, void* param, int32_t code) {
276,411,981✔
3318
  SRequestObj* pRequest = (SRequestObj*)param;
276,411,981✔
3319

3320
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
276,411,981✔
3321

3322
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
276,411,981✔
3323
           tstrerror(code), pRequest->requestId);
3324

3325
  pResultInfo->pData = pResult;
276,411,981✔
3326
  pResultInfo->numOfRows = 0;
276,411,923✔
3327

3328
  if (code != TSDB_CODE_SUCCESS) {
276,411,826✔
3329
    pRequest->code = code;
×
3330
    taosMemoryFreeClear(pResultInfo->pData);
×
3331
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3332
    return;
×
3333
  }
3334

3335
  if (pRequest->code != TSDB_CODE_SUCCESS) {
276,411,826✔
3336
    taosMemoryFreeClear(pResultInfo->pData);
×
3337
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3338
    return;
×
3339
  }
3340

3341
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
281,968,746✔
3342
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
276,411,918✔
3343
  if (pRequest->code != TSDB_CODE_SUCCESS) {
276,411,923✔
3344
    pResultInfo->numOfRows = 0;
64✔
3345
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
64✔
3346
             tstrerror(pRequest->code), pRequest->requestId);
3347
  } else {
3348
    tscDebug(
276,409,674✔
3349
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3350
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3351

3352
    STscObj*            pTscObj = pRequest->pTscObj;
276,410,190✔
3353
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
276,411,802✔
3354
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
276,411,874✔
3355
  }
3356

3357
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
276,411,923✔
3358
}
3359

3360
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
310,057,268✔
3361
  pRequest->body.fetchFp = fp;
310,057,268✔
3362
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
310,057,268✔
3363

3364
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
310,057,268✔
3365

3366
  // this query has no results or error exists, return directly
3367
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
310,057,268✔
3368
    pResultInfo->numOfRows = 0;
30✔
3369
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
×
3370
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3371

3372
    return;
2,371,097✔
3373
  }
3374

3375
  // all data has returned to App already, no need to try again
3376
  if (pResultInfo->completed) {
310,057,238✔
3377
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
67,290,574✔
3378
    // it is a local executed query, no need to do async fetch
3379
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
33,645,287✔
3380
      if (pResultInfo->localResultFetched) {
1,436,512✔
3381
        pResultInfo->numOfRows = 0;
718,256✔
3382
        pResultInfo->current = 0;
718,256✔
3383
      } else {
3384
        pResultInfo->localResultFetched = true;
718,256✔
3385
      }
3386
    } else {
3387
      pResultInfo->numOfRows = 0;
32,208,775✔
3388
    }
3389

3390
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
33,645,287✔
3391
    return;
33,645,287✔
3392
  }
3393

3394
  SSchedulerReq req = {
276,411,913✔
3395
      .syncReq = false,
3396
      .fetchFp = fetchCallback,
3397
      .cbParam = pRequest,
3398
  };
3399

3400
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
276,411,951✔
3401
  if (TSDB_CODE_SUCCESS != code) {
276,411,891✔
3402
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3403
    // pRequest->body.fetchFp(param, pRequest, code);
3404
  }
3405
}
3406

3407
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
950,101,391✔
3408
  pRequest->inCallback = true;
950,101,391✔
3409

3410
  int64_t this = pRequest->self;
950,107,526✔
3411
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
950,077,931✔
3412
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
18,944✔
3413
    code = TSDB_CODE_SUCCESS;
×
3414
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3415
  }
3416

3417
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
950,077,931✔
3418
           pRequest);
3419

3420
  if (pRequest->body.queryFp != NULL) {
950,080,145✔
3421
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
950,107,330✔
3422
  }
3423

3424
  SRequestObj* pReq = acquireRequest(this);
950,169,273✔
3425
  if (pReq != NULL) {
950,190,618✔
3426
    pReq->inCallback = false;
949,882,289✔
3427
    (void)releaseRequest(this);
949,881,842✔
3428
  }
3429
}
950,170,867✔
3430

3431
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
523,253✔
3432
                       SParseSqlRes* pRes) {
3433
#ifndef TD_ENTERPRISE
3434
  return TSDB_CODE_SUCCESS;
3435
#else
3436
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
523,253✔
3437
#endif
3438
}
3439

3440
void updateConnAccessInfo(SConnAccessInfo* pInfo) {
1,052,280,854✔
3441
  if (pInfo == NULL) {
1,052,280,854✔
3442
    return;
×
3443
  }
3444
  int64_t ts = taosGetTimestampMs();
1,052,300,847✔
3445
  if (pInfo->startTime == 0) {
1,052,300,847✔
3446
    pInfo->startTime = ts;
102,551,100✔
3447
  }
3448
  pInfo->lastAccessTime = ts;
1,052,299,911✔
3449
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc