• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4988

16 Mar 2026 12:26PM UTC coverage: 75.821% (+1.9%) from 73.883%
#4988

push

travis-ci

web-flow
feat: support secure delete option. (#34591)

274 of 464 new or added lines in 29 files covered. (59.05%)

4404 existing lines in 23 files now uncovered.

337108 of 444611 relevant lines covered (75.82%)

146708292.94 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.6
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "command.h"
22
#include "decimal.h"
23
#include "scheduler.h"
24
#include "tdatablock.h"
25
#include "tdataformat.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tmisce.h"
29
#include "tmsg.h"
30
#include "tmsgtype.h"
31
#include "tpagedbuf.h"
32
#include "tref.h"
33
#include "tsched.h"
34
#include "tversion.h"
35

36
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
37
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
38

39
void setQueryRequest(int64_t rId) {
1,276,797,900✔
40
  SRequestObj* pReq = acquireRequest(rId);
1,276,797,900✔
41
  if (pReq != NULL) {
1,276,872,788✔
42
    pReq->isQuery = true;
1,276,834,762✔
43
    (void)releaseRequest(rId);
1,276,834,786✔
44
  }
45
}
1,276,855,248✔
46

47
static bool stringLengthCheck(const char* str, size_t maxsize) {
336,983,598✔
48
  if (str == NULL) {
336,983,598✔
49
    return false;
×
50
  }
51

52
  size_t len = strlen(str);
336,983,598✔
53
  if (len <= 0 || len > maxsize) {
336,983,598✔
54
    return false;
18✔
55
  }
56

57
  return true;
336,987,666✔
58
}
59

60
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
167,974,488✔
61

62
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
167,974,710✔
63

64
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,034,994✔
65

66
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
167,978,456✔
67
  char key[512] = {0};
167,978,456✔
68
  if (user == NULL) {
167,978,456✔
69
    (void)snprintf(key, sizeof(key), "%s:%s:%d", auth, ip, port);
4,666✔
70
  } else {
71
    (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
167,973,790✔
72
  }
73
  return taosStrdup(key);
167,978,456✔
74
}
75

76
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
1,122,116✔
77
  if (dst == NULL || src == NULL || srcLength == 0) {
1,122,116✔
78
    return 0;
948✔
79
  }
80

81
  size_t escapeLength = 0;
1,121,168✔
82
  for (size_t i = 0; i < srcLength; ++i) {
31,815,088✔
83
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
30,693,920✔
84
        src[i] == '\t') {
30,693,920✔
85
      escapeLength += 1;
×
86
    }
87
  }
88

89
  size_t dstLength = srcLength;
1,121,168✔
90
  if (escapeLength == 0) {
1,121,168✔
91
    (void)memcpy(dst, src, srcLength);
1,121,168✔
92
  } else {
93
    dstLength = 0;
×
94
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
95
      switch (src[i]) {
×
96
        case '\"':
×
97
          dst[dstLength++] = '\\';
×
98
          dst[dstLength++] = '\"';
×
99
          break;
×
100
        case '\\':
×
101
          dst[dstLength++] = '\\';
×
102
          dst[dstLength++] = '\\';
×
103
          break;
×
104
        case '\b':
×
105
          dst[dstLength++] = '\\';
×
106
          dst[dstLength++] = 'b';
×
107
          break;
×
108
        case '\f':
×
109
          dst[dstLength++] = '\\';
×
110
          dst[dstLength++] = 'f';
×
111
          break;
×
112
        case '\n':
×
113
          dst[dstLength++] = '\\';
×
114
          dst[dstLength++] = 'n';
×
115
          break;
×
116
        case '\r':
×
117
          dst[dstLength++] = '\\';
×
118
          dst[dstLength++] = 'r';
×
119
          break;
×
120
        case '\t':
×
121
          dst[dstLength++] = '\\';
×
122
          dst[dstLength++] = 't';
×
123
          break;
×
124
        default:
×
125
          dst[dstLength++] = src[i];
×
126
      }
127
    }
128
  }
129

130
  return dstLength;
1,121,168✔
131
}
132

133
bool chkRequestKilled(void* param) {
2,147,483,647✔
134
  bool         killed = false;
2,147,483,647✔
135
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
136
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
137
    killed = true;
5,258✔
138
  }
139

140
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
141

142
  return killed;
2,147,483,647✔
143
}
144

145
void cleanupAppInfo() {
2,675,332✔
146
  taosHashCleanup(appInfo.pInstMap);
2,675,332✔
147
  taosHashCleanup(appInfo.pInstMapByClusterId);
2,675,332✔
148
  tscInfo("cluster instance map cleaned");
2,675,332✔
149
}
2,675,332✔
150

151
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
152
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
153
                               STscObj** pTscObj);
154

155
int32_t taos_connect_by_auth(const char* ip, const char* user, const char* auth, const char* totp,
167,979,776✔
156
                                    const char* db, uint16_t port, int connType, STscObj** pObj) {
157
  TSC_ERR_RET(taos_init());
167,979,776✔
158

159
  if (user == NULL) {
167,975,542✔
160
    if (auth == NULL || strlen(auth) != (TSDB_TOKEN_LEN - 1)) {
6,156✔
161
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOKEN);
1,490✔
162
    }
163
  } else if (!validateUserName(user)) {
167,969,386✔
164
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
165
  }
166
  int32_t code = 0;
167,979,542✔
167

168
  char localDb[TSDB_DB_NAME_LEN] = {0};
167,979,542✔
169
  if (db != NULL && strlen(db) > 0) {
167,979,328✔
170
    if (!validateDbName(db)) {
1,034,790✔
171
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
172
    }
173

174
    tstrncpy(localDb, db, sizeof(localDb));
1,036,526✔
175
    (void)strdequote(localDb);
1,036,526✔
176
  }
177

178
  int32_t totpCode = -1;
167,972,946✔
179
  if (totp != NULL) {
167,972,946✔
180
    char* endptr = NULL;
4,658✔
181
    totpCode = taosStr2Int32(totp, &endptr, 10);
4,658✔
182
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
4,658✔
183
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
184
    }
185
  }
186

187
  SCorEpSet epSet = {0};
167,972,946✔
188
  if (ip) {
167,977,078✔
189
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
164,373,512✔
190
  } else {
191
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
3,603,566✔
192
  }
193

194
  if (port) {
167,976,236✔
195
    epSet.epSet.eps[0].port = port;
162,949,468✔
196
    epSet.epSet.eps[1].port = port;
162,949,468✔
197
  }
198

199
  char* key = getClusterKey(user, auth, ip, port);
167,976,236✔
200
  if (NULL == key) {
167,980,512✔
201
    TSC_ERR_RET(terrno);
×
202
  }
203
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
167,980,512✔
204
          user ? user : "", db, key);
205
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
339,565,248✔
206
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
171,583,228✔
207
  }
208

209
  SAppInstInfo** pInst = NULL;
167,982,020✔
210
  code = taosThreadMutexLock(&appInfo.mutex);
167,982,020✔
211
  if (TSDB_CODE_SUCCESS != code) {
167,981,558✔
212
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
213
    TSC_ERR_RET(code);
×
214
  }
215

216
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
167,981,558✔
217
  SAppInstInfo* p = NULL;
167,981,558✔
218
  if (pInst == NULL) {
167,981,558✔
219
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
2,833,010✔
220
    if (NULL == p) {
2,833,010✔
221
      TSC_ERR_JRET(terrno);
×
222
    }
223
    p->mgmtEp = epSet;
2,833,010✔
224
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
2,833,010✔
225
    if (TSDB_CODE_SUCCESS != code) {
2,833,010✔
226
      taosMemoryFree(p);
×
227
      TSC_ERR_JRET(code);
×
228
    }
229
    code = openTransporter(user, auth, tsNumOfCores / 2, &p->pTransporter);
2,833,010✔
230
    if (TSDB_CODE_SUCCESS != code) {
2,833,010✔
231
      taosMemoryFree(p);
×
232
      TSC_ERR_JRET(code);
×
233
    }
234
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
2,833,010✔
235
    if (TSDB_CODE_SUCCESS != code) {
2,833,010✔
236
      destroyAppInst(&p);
×
237
      TSC_ERR_JRET(code);
×
238
    }
239
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
2,833,010✔
240
    if (TSDB_CODE_SUCCESS != code) {
2,833,010✔
241
      destroyAppInst(&p);
×
242
      TSC_ERR_JRET(code);
×
243
    }
244
    p->instKey = key;
2,833,010✔
245
    key = NULL;
2,833,010✔
246
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user ? user : "", epSet.epSet.eps[0].fqdn,
2,833,010✔
247
            epSet.epSet.eps[0].port);
248

249
    pInst = &p;
2,833,010✔
250
  } else {
251
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
165,148,548✔
252
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
253
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
254
    }
255
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
256
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
165,148,548✔
257
  }
258

259
_return:
167,981,558✔
260

261
  if (TSDB_CODE_SUCCESS != code) {
167,981,558✔
262
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
263
    taosMemoryFreeClear(key);
×
264
    return code;
×
265
  } else {
266
    code = taosThreadMutexUnlock(&appInfo.mutex);
167,981,558✔
267
    taosMemoryFreeClear(key);
167,979,990✔
268
    if (TSDB_CODE_SUCCESS != code) {
167,981,426✔
269
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
270
      return code;
×
271
    }
272
    return taosConnectImpl(user, auth, totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
167,981,426✔
273
  }
274
}
275

276
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
167,974,108✔
277
                              uint16_t port, int connType, STscObj** pObj) {
278
  char auth[TSDB_PASSWORD_LEN + 1] = {0};
167,974,108✔
279
  if (!validatePassword(pass)) {
167,974,108✔
280
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
281
  }
282

283
  taosEncryptPass_c((uint8_t*)pass, strlen(pass), auth);
167,976,802✔
284
  return taos_connect_by_auth(ip, user, auth, totp, db, port, connType, pObj);
167,975,556✔
285
}
286

287
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
288
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
289
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
290
//     return *ppAppInstInfo;
291
//   } else {
292
//     return NULL;
293
//   }
294
// }
295

296
void freeQueryParam(SSyncQueryParam* param) {
1,020,170✔
297
  if (param == NULL) return;
1,020,170✔
298
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
1,020,170✔
299
    tscError("failed to destroy semaphore in freeQueryParam");
×
300
  }
301
  taosMemoryFree(param);
1,020,170✔
302
}
303

304
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
1,854,014,352✔
305
                     SRequestObj** pRequest, int64_t reqid) {
306
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
1,854,014,352✔
307
  if (TSDB_CODE_SUCCESS != code) {
1,854,022,132✔
308
    tscError("failed to malloc sqlObj, %s", sql);
×
309
    return code;
×
310
  }
311

312
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
1,854,022,132✔
313
  if ((*pRequest)->sqlstr == NULL) {
1,854,020,714✔
314
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
315
    destroyRequest(*pRequest);
×
316
    *pRequest = NULL;
×
317
    return terrno;
×
318
  }
319

320
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
1,854,007,502✔
321
  (*pRequest)->sqlstr[sqlLen] = 0;
1,854,038,298✔
322
  (*pRequest)->sqlLen = sqlLen;
1,854,038,090✔
323
  (*pRequest)->validateOnly = validateSql;
1,854,038,996✔
324
  (*pRequest)->stmtBindVersion = 0;
1,854,033,342✔
325

326
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
1,854,036,644✔
327

328
  STscObj* pTscObj = (*pRequest)->pTscObj;
1,854,033,308✔
329
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
1,854,028,776✔
330
                             sizeof((*pRequest)->self));
331
  if (err) {
1,854,034,364✔
332
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
333
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
334
    destroyRequest(*pRequest);
×
335
    *pRequest = NULL;
×
336
    return terrno;
×
337
  }
338

339
  (*pRequest)->allocatorRefId = -1;
1,854,034,364✔
340
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
1,854,031,622✔
341
    if (TSDB_CODE_SUCCESS !=
869,909,612✔
342
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
869,896,340✔
343
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
344
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
345
      destroyRequest(*pRequest);
×
346
      *pRequest = NULL;
×
347
      return terrno;
×
348
    }
349
  }
350

351
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
1,854,042,952✔
352
  return TSDB_CODE_SUCCESS;
1,854,017,190✔
353
}
354

355
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
356
  int32_t code =
357
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
358
  if (TSDB_CODE_SUCCESS == code) {
×
359
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
360
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
361
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
362
    (*pNewRequest)->isSubReq = true;
×
363
  }
364
  return code;
×
365
}
366

367
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
13,741,760✔
368
  STscObj* pTscObj = pRequest->pTscObj;
13,741,760✔
369

370
  SParseContext cxt = {
13,749,156✔
371
      .requestId = pRequest->requestId,
13,749,156✔
372
      .requestRid = pRequest->self,
13,745,186✔
373
      .acctId = pTscObj->acctId,
13,746,946✔
374
      .db = pRequest->pDb,
13,749,184✔
375
      .topicQuery = topicQuery,
376
      .pSql = pRequest->sqlstr,
13,747,746✔
377
      .sqlLen = pRequest->sqlLen,
13,748,420✔
378
      .pMsg = pRequest->msgBuf,
13,748,362✔
379
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
380
      .pTransporter = pTscObj->pAppInfo->pTransporter,
13,745,702✔
381
      .pStmtCb = pStmtCb,
382
      .pUser = pTscObj->user,
13,748,246✔
383
      .userId = pTscObj->userId,
13,745,072✔
384
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
13,749,054✔
385
      .enableSysInfo = pTscObj->sysInfo,
13,748,512✔
386
      .svrVer = pTscObj->sVer,
13,737,620✔
387
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
13,739,684✔
388
      .stmtBindVersion = pRequest->stmtBindVersion,
13,747,920✔
389
      .setQueryFp = setQueryRequest,
390
      .timezone = pTscObj->optionInfo.timezone,
13,736,954✔
391
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
13,741,040✔
392
  };
393

394
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
13,748,398✔
395
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
13,755,922✔
396
  if (code != TSDB_CODE_SUCCESS) {
13,745,126✔
397
    return code;
×
398
  }
399

400
  code = qParseSql(&cxt, pQuery);
13,745,126✔
401
  if (TSDB_CODE_SUCCESS == code) {
13,730,978✔
402
    if ((*pQuery)->haveResultSet) {
13,732,354✔
403
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
404
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
405
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
406
    }
407
  }
408

409
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
13,734,962✔
410
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
13,728,480✔
411
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
13,729,764✔
412
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
13,720,328✔
413
  }
414

415
  taosArrayDestroy(cxt.pTableMetaPos);
13,728,800✔
416
  taosArrayDestroy(cxt.pTableVgroupPos);
13,710,046✔
417

418
  return code;
13,724,084✔
419
}
420

421
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
422
  SRetrieveTableRsp* pRsp = NULL;
×
423
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
424
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
425
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
426
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
427
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
428
                                 pRequest->stmtBindVersion > 0);
×
429
  }
430

431
  return code;
×
432
}
433

434
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
824,946✔
435
  // drop table if exists not_exists_table
436
  if (NULL == pQuery->pCmdMsg) {
824,946✔
437
    return TSDB_CODE_SUCCESS;
×
438
  }
439

440
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
824,946✔
441
  pRequest->type = pMsgInfo->msgType;
824,946✔
442
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
824,946✔
443
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
824,946✔
444

445
  STscObj*      pTscObj = pRequest->pTscObj;
824,946✔
446
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
824,946✔
447

448
  // int64_t transporterId = 0;
449
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
824,946✔
450
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
824,946✔
451
  return TSDB_CODE_SUCCESS;
824,946✔
452
}
453

454
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
2,147,483,647✔
455

456
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
11,344,650✔
457
  SRetrieveTableRsp* pRsp = NULL;
11,344,650✔
458
  if (pRequest->validateOnly) {
11,344,650✔
459
    doRequestCallback(pRequest, 0);
20,412✔
460
    return;
20,412✔
461
  }
462

463
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
22,591,754✔
464
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
22,591,754✔
465
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
11,324,238✔
466
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
6,281,412✔
467
                                 pRequest->stmtBindVersion > 0);
6,281,412✔
468
  }
469

470
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
11,324,238✔
471
  pRequest->code = code;
11,324,238✔
472

473
  if (pRequest->code != TSDB_CODE_SUCCESS) {
11,324,238✔
474
    pResultInfo->numOfRows = 0;
3,446✔
475
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
3,446✔
476
             pRequest->requestId);
477
  } else {
478
    tscDebug(
11,320,792✔
479
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
480
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
481
  }
482

483
  doRequestCallback(pRequest, code);
11,324,238✔
484
}
485

486
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
195,102,812✔
487
  if (pRequest->validateOnly) {
195,102,812✔
488
    doRequestCallback(pRequest, 0);
×
489
    return TSDB_CODE_SUCCESS;
×
490
  }
491

492
  // drop table if exists not_exists_table
493
  if (NULL == pQuery->pCmdMsg) {
195,102,894✔
494
    doRequestCallback(pRequest, 0);
13,210✔
495
    return TSDB_CODE_SUCCESS;
13,210✔
496
  }
497

498
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
195,089,612✔
499
  pRequest->type = pMsgInfo->msgType;
195,089,520✔
500
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
195,089,684✔
501
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
195,089,612✔
502

503
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
195,089,520✔
504
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
195,089,550✔
505

506
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
195,097,812✔
507
  if (code) {
195,111,206✔
508
    doRequestCallback(pRequest, code);
×
509
  }
510
  return code;
195,113,610✔
511
}
512

513
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
224,018✔
514
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
224,018✔
515
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
224,018✔
516

517
  if (node1->load < node2->load) {
224,018✔
518
    return -1;
×
519
  }
520

521
  return node1->load > node2->load;
224,018✔
522
}
523

524
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
641,282✔
525
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
641,282✔
526
  if (pInfo->pQnodeList) {
641,282✔
527
    taosArrayDestroy(pInfo->pQnodeList);
632,200✔
528
    pInfo->pQnodeList = NULL;
632,200✔
529
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
632,200✔
530
  }
531

532
  if (pNodeList) {
641,282✔
533
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
641,282✔
534
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
641,282✔
535
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
641,282✔
536
             taosArrayGetSize(pInfo->pQnodeList));
537
  }
538
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
641,282✔
539

540
  return TSDB_CODE_SUCCESS;
641,282✔
541
}
542

543
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
1,841,805,710✔
544
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
1,841,805,710✔
545
    *required = false;
1,817,804,410✔
546
    return TSDB_CODE_SUCCESS;
1,817,794,228✔
547
  }
548

549
  int32_t       code = TSDB_CODE_SUCCESS;
24,001,300✔
550
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
24,001,300✔
551
  *required = false;
24,000,316✔
552

553
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
24,000,316✔
554
  *required = (NULL == pInfo->pQnodeList);
24,001,300✔
555
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
24,001,300✔
556
  return TSDB_CODE_SUCCESS;
24,001,300✔
557
}
558

559
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
560
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
561
  int32_t       code = 0;
×
562

563
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
564
  if (pInfo->pQnodeList) {
×
565
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
566
  }
567
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
568
  if (NULL == *pNodeList) {
×
569
    SCatalog* pCatalog = NULL;
×
570
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
571
    if (TSDB_CODE_SUCCESS == code) {
×
572
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
573
      if (NULL == pNodeList) {
×
574
        TSC_ERR_RET(terrno);
×
575
      }
576
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
577
                               .requestId = pRequest->requestId,
×
578
                               .requestObjRefId = pRequest->self,
×
579
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
580
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
581
    }
582

583
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
584
      code = updateQnodeList(pInfo, *pNodeList);
×
585
    }
586
  }
587

588
  return code;
×
589
}
590

591
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
16,199,390✔
592
  pRequest->type = pQuery->msgType;
16,199,390✔
593
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
16,191,484✔
594

595
  SPlanContext cxt = {.queryId = pRequest->requestId,
16,943,494✔
596
                      .acctId = pRequest->pTscObj->acctId,
16,181,264✔
597
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
16,180,172✔
598
                      .pAstRoot = pQuery->pRoot,
16,209,276✔
599
                      .showRewrite = pQuery->showRewrite,
16,207,916✔
600
                      .pMsg = pRequest->msgBuf,
16,200,548✔
601
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
602
                      .pUser = pRequest->pTscObj->user,
16,194,360✔
603
                      .userId = pRequest->pTscObj->userId,
16,188,504✔
604
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
16,184,876✔
605
                      .sysInfo = pRequest->pTscObj->sysInfo};
16,186,944✔
606

607
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
16,176,140✔
608
}
609

610
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
464,608,084✔
611
                         const SExtSchema* pExtSchema, bool isStmt) {
612
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
464,608,084✔
613
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
90✔
614
    return TSDB_CODE_INVALID_PARA;
×
615
  }
616

617
  pResInfo->numOfCols = numOfCols;
464,636,638✔
618
  if (pResInfo->fields != NULL) {
464,641,528✔
619
    taosMemoryFree(pResInfo->fields);
27,212✔
620
  }
621
  if (pResInfo->userFields != NULL) {
464,635,258✔
622
    taosMemoryFree(pResInfo->userFields);
27,212✔
623
  }
624
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
464,636,824✔
625
  if (NULL == pResInfo->fields) return terrno;
464,617,610✔
626
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
464,617,134✔
627
  if (NULL == pResInfo->userFields) {
464,630,242✔
628
    taosMemoryFree(pResInfo->fields);
×
629
    return terrno;
×
630
  }
631
  if (numOfCols != pResInfo->numOfCols) {
464,630,926✔
632
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
633
    return TSDB_CODE_FAILED;
×
634
  }
635

636
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
2,147,483,647✔
637
    pResInfo->fields[i].type = pSchema[i].type;
1,823,484,214✔
638

639
    pResInfo->userFields[i].type = pSchema[i].type;
1,823,485,022✔
640
    // userFields must convert to type bytes, no matter isStmt or not
641
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
1,823,480,256✔
642
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
1,823,485,028✔
643
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
1,823,504,920✔
644
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
2,734,856✔
645
    }
646

647
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
1,823,516,808✔
648
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
1,823,528,026✔
649
  }
650
  return TSDB_CODE_SUCCESS;
464,677,700✔
651
}
652

653
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
378,963,998✔
654
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
378,963,998✔
655
      precision != TSDB_TIME_PRECISION_NANO) {
656
    return;
×
657
  }
658

659
  pResInfo->precision = precision;
378,963,998✔
660
}
661

662
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
387,434,742✔
663
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
387,434,742✔
664
  if (NULL == nodeList) {
387,464,598✔
665
    return terrno;
×
666
  }
667
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
387,469,078✔
668

669
  int32_t dbNum = taosArrayGetSize(pDbVgList);
387,469,078✔
670
  for (int32_t i = 0; i < dbNum; ++i) {
770,443,318✔
671
    SArray* pVg = taosArrayGetP(pDbVgList, i);
382,928,272✔
672
    if (NULL == pVg) {
382,942,362✔
673
      continue;
×
674
    }
675
    int32_t vgNum = taosArrayGetSize(pVg);
382,942,362✔
676
    if (vgNum <= 0) {
382,948,520✔
677
      continue;
1,134,832✔
678
    }
679

680
    for (int32_t j = 0; j < vgNum; ++j) {
1,319,727,110✔
681
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
937,904,254✔
682
      if (NULL == pInfo) {
937,940,326✔
683
        taosArrayDestroy(nodeList);
×
684
        return TSDB_CODE_OUT_OF_RANGE;
×
685
      }
686
      SQueryNodeLoad load = {0};
937,940,326✔
687
      load.addr.nodeId = pInfo->vgId;
937,930,490✔
688
      load.addr.epSet = pInfo->epSet;
937,966,438✔
689

690
      if (NULL == taosArrayPush(nodeList, &load)) {
937,784,402✔
691
        taosArrayDestroy(nodeList);
×
692
        return terrno;
×
693
      }
694
    }
695
  }
696

697
  int32_t vnodeNum = taosArrayGetSize(nodeList);
387,515,046✔
698
  if (vnodeNum > 0) {
387,514,876✔
699
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
381,201,818✔
700
    goto _return;
381,191,152✔
701
  }
702

703
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
6,313,058✔
704
  if (mnodeNum <= 0) {
6,302,138✔
705
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
706
    goto _return;
×
707
  }
708

709
  void* pData = taosArrayGet(pMnodeList, 0);
6,302,138✔
710
  if (NULL == pData) {
6,302,138✔
711
    taosArrayDestroy(nodeList);
×
712
    return TSDB_CODE_OUT_OF_RANGE;
×
713
  }
714
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
6,302,138✔
715
    taosArrayDestroy(nodeList);
×
716
    return terrno;
×
717
  }
718

719
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
6,302,138✔
720

721
_return:
164,780✔
722

723
  *pNodeList = nodeList;
387,472,210✔
724

725
  return TSDB_CODE_SUCCESS;
387,463,584✔
726
}
727

728
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
3,028,664✔
729
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
3,028,664✔
730
  if (NULL == nodeList) {
3,028,664✔
731
    return terrno;
×
732
  }
733

734
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
3,028,664✔
735
  if (qNodeNum > 0) {
3,028,664✔
736
    void* pData = taosArrayGet(pQnodeList, 0);
2,988,408✔
737
    if (NULL == pData) {
2,988,408✔
738
      taosArrayDestroy(nodeList);
×
739
      return TSDB_CODE_OUT_OF_RANGE;
×
740
    }
741
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
2,988,408✔
742
      taosArrayDestroy(nodeList);
×
743
      return terrno;
×
744
    }
745
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
2,988,408✔
746
    goto _return;
2,988,408✔
747
  }
748

749
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
40,256✔
750
  if (mnodeNum <= 0) {
40,256✔
751
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
×
752
    goto _return;
×
753
  }
754

755
  void* pData = taosArrayGet(pMnodeList, 0);
40,256✔
756
  if (NULL == pData) {
40,256✔
757
    taosArrayDestroy(nodeList);
×
758
    return TSDB_CODE_OUT_OF_RANGE;
×
759
  }
760
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
40,256✔
761
    taosArrayDestroy(nodeList);
×
762
    return terrno;
×
763
  }
764

765
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
40,256✔
766

767
_return:
×
768

769
  *pNodeList = nodeList;
3,028,664✔
770

771
  return TSDB_CODE_SUCCESS;
3,028,664✔
772
}
773

774
void freeVgList(void* list) {
16,022,424✔
775
  SArray* pList = *(SArray**)list;
16,022,424✔
776
  taosArrayDestroy(pList);
16,031,930✔
777
}
16,005,806✔
778

779
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
374,282,242✔
780
  SArray* pDbVgList = NULL;
374,282,242✔
781
  SArray* pQnodeList = NULL;
374,282,242✔
782
  FDelete fp = NULL;
374,282,242✔
783
  int32_t code = 0;
374,282,242✔
784

785
  switch (tsQueryPolicy) {
374,282,242✔
786
    case QUERY_POLICY_VNODE:
371,286,300✔
787
    case QUERY_POLICY_CLIENT: {
788
      if (pResultMeta) {
371,286,300✔
789
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
371,295,404✔
790
        if (NULL == pDbVgList) {
371,297,746✔
791
          code = terrno;
×
792
          goto _return;
×
793
        }
794
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
371,297,746✔
795
        for (int32_t i = 0; i < dbNum; ++i) {
738,171,920✔
796
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
366,880,442✔
797
          if (pRes->code || NULL == pRes->pRes) {
366,888,100✔
798
            continue;
1,092✔
799
          }
800

801
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
733,786,682✔
802
            code = terrno;
×
803
            goto _return;
×
804
          }
805
        }
806
      } else {
807
        fp = freeVgList;
1,728✔
808

809
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
1,728✔
810
        if (dbNum > 0) {
1,728✔
811
          SCatalog*     pCtg = NULL;
1,728✔
812
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,728✔
813
          code = catalogGetHandle(pInst->clusterId, &pCtg);
1,728✔
814
          if (code != TSDB_CODE_SUCCESS) {
1,728✔
815
            goto _return;
×
816
          }
817

818
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
1,728✔
819
          if (NULL == pDbVgList) {
1,728✔
820
            code = terrno;
×
821
            goto _return;
×
822
          }
823
          SArray* pVgList = NULL;
1,728✔
824
          for (int32_t i = 0; i < dbNum; ++i) {
3,456✔
825
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
1,728✔
826
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
1,728✔
827
                                     .requestId = pRequest->requestId,
1,728✔
828
                                     .requestObjRefId = pRequest->self,
1,728✔
829
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
1,728✔
830

831
            // catalogGetDBVgList will handle dbFName == null.
832
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
1,728✔
833
            if (code) {
1,728✔
834
              goto _return;
×
835
            }
836

837
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
1,728✔
838
              code = terrno;
×
839
              goto _return;
×
840
            }
841
          }
842
        }
843
      }
844

845
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
371,293,206✔
846
      break;
371,268,468✔
847
    }
848
    case QUERY_POLICY_HYBRID:
3,028,664✔
849
    case QUERY_POLICY_QNODE: {
850
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
3,090,568✔
851
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
61,904✔
852
        if (pRes->code) {
61,904✔
853
          pQnodeList = NULL;
×
854
        } else {
855
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
61,904✔
856
          if (NULL == pQnodeList) {
61,904✔
857
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
858
            goto _return;
×
859
          }
860
        }
861
      } else {
862
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
2,966,760✔
863
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
2,966,760✔
864
        if (pInst->pQnodeList) {
2,966,760✔
865
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
2,966,760✔
866
          if (NULL == pQnodeList) {
2,966,760✔
867
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
868
            goto _return;
×
869
          }
870
        }
871
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
2,966,760✔
872
      }
873

874
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
3,028,664✔
875
      break;
3,028,664✔
876
    }
877
    default:
190✔
878
      tscError("unknown query policy: %d", tsQueryPolicy);
190✔
879
      return TSDB_CODE_APP_ERROR;
×
880
  }
881

882
_return:
374,297,132✔
883
  taosArrayDestroyEx(pDbVgList, fp);
374,297,132✔
884
  taosArrayDestroy(pQnodeList);
374,311,536✔
885

886
  return code;
374,318,806✔
887
}
888

889
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
16,184,784✔
890
  SArray* pDbVgList = NULL;
16,184,784✔
891
  SArray* pQnodeList = NULL;
16,184,784✔
892
  int32_t code = 0;
16,193,748✔
893

894
  switch (tsQueryPolicy) {
16,193,748✔
895
    case QUERY_POLICY_VNODE:
16,166,008✔
896
    case QUERY_POLICY_CLIENT: {
897
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
16,166,008✔
898
      if (dbNum > 0) {
16,197,466✔
899
        SCatalog*     pCtg = NULL;
16,030,318✔
900
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
16,030,332✔
901
        code = catalogGetHandle(pInst->clusterId, &pCtg);
16,025,628✔
902
        if (code != TSDB_CODE_SUCCESS) {
16,024,902✔
903
          goto _return;
×
904
        }
905

906
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
16,024,902✔
907
        if (NULL == pDbVgList) {
16,023,236✔
908
          code = terrno;
74✔
909
          goto _return;
×
910
        }
911
        SArray* pVgList = NULL;
16,023,162✔
912
        for (int32_t i = 0; i < dbNum; ++i) {
32,047,832✔
913
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
16,021,130✔
914
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
16,039,962✔
915
                                   .requestId = pRequest->requestId,
16,028,348✔
916
                                   .requestObjRefId = pRequest->self,
16,024,634✔
917
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
16,023,938✔
918

919
          // catalogGetDBVgList will handle dbFName == null.
920
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
16,039,730✔
921
          if (code) {
16,019,738✔
922
            goto _return;
×
923
          }
924

925
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
16,043,560✔
926
            code = terrno;
×
927
            goto _return;
×
928
          }
929
        }
930
      }
931

932
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
16,202,642✔
933
      break;
16,188,330✔
934
    }
935
    case QUERY_POLICY_HYBRID:
×
936
    case QUERY_POLICY_QNODE: {
937
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
938

939
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
940
      break;
×
941
    }
942
    default:
27,770✔
943
      tscError("unknown query policy: %d", tsQueryPolicy);
27,770✔
944
      return TSDB_CODE_APP_ERROR;
×
945
  }
946

947
_return:
16,184,684✔
948

949
  taosArrayDestroyEx(pDbVgList, freeVgList);
16,181,044✔
950
  taosArrayDestroy(pQnodeList);
16,176,512✔
951

952
  return code;
16,187,284✔
953
}
954

955
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
16,181,880✔
956
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
16,181,880✔
957

958
  SExecResult      res = {0};
16,198,164✔
959
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
16,202,280✔
960
                           .requestId = pRequest->requestId,
16,194,166✔
961
                           .requestObjRefId = pRequest->self};
16,186,742✔
962
  SSchedulerReq    req = {
16,944,334✔
963
         .syncReq       = true,
964
         .localReq      = (tsQueryPolicy == QUERY_POLICY_CLIENT),
16,181,954✔
965
         .pConn         = &conn,
966
         .pNodeList     = pNodeList,
967
         .pDag          = pDag,
968
         .sql           = pRequest->sqlstr,
16,181,954✔
969
         .startTs       = pRequest->metric.start,
16,184,088✔
970
         .execFp        = NULL,
971
         .cbParam       = NULL,
972
         .chkKillFp     = chkRequestKilled,
973
         .chkKillParam  = (void*)pRequest->self,
16,167,576✔
974
         .pExecRes      = &res,
975
         .source        = pRequest->source,
16,174,486✔
976
         .secureDelete  = pRequest->secureDelete,
16,168,378✔
977
         .pWorkerCb     = getTaskPoolWorkerCb(),
16,196,112✔
978
  };
979

980
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
16,192,256✔
981

982
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
16,214,370✔
983
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
16,213,052✔
984

985
  if (code != TSDB_CODE_SUCCESS) {
16,210,216✔
UNCOV
986
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
987

UNCOV
988
    pRequest->code = code;
×
989
    terrno = code;
×
990
    return pRequest->code;
×
991
  }
992

993
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
16,210,216✔
994
      TDMT_VND_CREATE_TABLE == pRequest->type) {
136,280✔
995
    pRequest->body.resInfo.numOfRows = res.numOfRows;
16,167,464✔
996
    if (TDMT_VND_SUBMIT == pRequest->type) {
16,164,222✔
997
      STscObj*            pTscObj = pRequest->pTscObj;
16,070,616✔
998
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
16,072,554✔
999
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
16,077,498✔
1000
    }
1001

1002
    schedulerFreeJob(&pRequest->body.queryJob, 0);
16,168,622✔
1003
  }
1004

1005
  pRequest->code = res.code;
16,212,024✔
1006
  terrno = res.code;
16,213,712✔
1007
  return pRequest->code;
16,205,884✔
1008
}
1009

1010
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
983,897,910✔
1011
  SArray*      pArray = NULL;
983,897,910✔
1012
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
983,897,910✔
1013
  if (NULL == pRsp->aCreateTbRsp) {
983,897,910✔
1014
    return TSDB_CODE_SUCCESS;
959,532,370✔
1015
  }
1016

1017
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
24,378,742✔
1018
  for (int32_t i = 0; i < tbNum; ++i) {
54,706,056✔
1019
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
30,327,746✔
1020
    if (pTbRsp->pMeta) {
30,326,386✔
1021
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
29,777,976✔
1022
    }
1023
  }
1024

1025
  return TSDB_CODE_SUCCESS;
24,378,310✔
1026
}
1027

1028
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
297,372,364✔
1029
  int32_t code = 0;
297,372,364✔
1030
  SArray* pArray = NULL;
297,372,364✔
1031
  SArray* pTbArray = (SArray*)res;
297,372,364✔
1032
  int32_t tbNum = taosArrayGetSize(pTbArray);
297,372,364✔
1033
  if (tbNum <= 0) {
297,376,832✔
UNCOV
1034
    return TSDB_CODE_SUCCESS;
×
1035
  }
1036

1037
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
297,376,832✔
1038
  if (NULL == pArray) {
297,376,980✔
1039
    return terrno;
348✔
1040
  }
1041

1042
  for (int32_t i = 0; i < tbNum; ++i) {
965,361,064✔
1043
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
667,981,994✔
1044
    if (NULL == tbInfo) {
667,982,408✔
UNCOV
1045
      code = terrno;
×
1046
      goto _return;
×
1047
    }
1048
    STbSVersion tbSver = {
667,982,408✔
1049
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
667,982,424✔
1050
    if (NULL == taosArrayPush(pArray, &tbSver)) {
667,985,396✔
UNCOV
1051
      code = terrno;
×
1052
      goto _return;
×
1053
    }
1054
  }
1055

1056
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
297,379,070✔
1057
                           .requestId = pRequest->requestId,
297,379,124✔
1058
                           .requestObjRefId = pRequest->self,
297,378,992✔
1059
                           .mgmtEps = *epset};
1060

1061
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
297,378,860✔
1062

1063
_return:
297,375,578✔
1064

1065
  taosArrayDestroy(pArray);
297,376,736✔
1066
  return code;
297,378,104✔
1067
}
1068

1069
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
16,735,328✔
1070
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
16,735,328✔
1071
}
1072

1073
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
132,134,628✔
1074
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
132,134,628✔
1075
}
1076

1077
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
1,474,767,520✔
1078
  if (NULL == pRequest->body.resInfo.execRes.res) {
1,474,767,520✔
1079
    return pRequest->code;
100,592,634✔
1080
  }
1081

1082
  SCatalog*     pCatalog = NULL;
1,374,128,278✔
1083
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
1,374,141,150✔
1084

1085
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
1,374,186,610✔
1086
  if (code) {
1,374,183,250✔
UNCOV
1087
    return code;
×
1088
  }
1089

1090
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
1,374,183,250✔
1091
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,374,200,038✔
1092

1093
  switch (pRes->msgType) {
1,374,187,576✔
1094
    case TDMT_VND_ALTER_TABLE:
7,605,426✔
1095
    case TDMT_MND_ALTER_STB: {
1096
      code = handleAlterTbExecRes(pRes->res, pCatalog);
7,605,426✔
1097
      break;
7,605,426✔
1098
    }
1099
    case TDMT_VND_CREATE_TABLE: {
84,493,562✔
1100
      SArray* pList = (SArray*)pRes->res;
84,493,562✔
1101
      int32_t num = taosArrayGetSize(pList);
84,520,572✔
1102
      for (int32_t i = 0; i < num; ++i) {
183,123,386✔
1103
        void* res = taosArrayGetP(pList, i);
98,592,104✔
1104
        // handleCreateTbExecRes will handle res == null
1105
        code = handleCreateTbExecRes(res, pCatalog);
98,599,310✔
1106
      }
1107
      break;
84,531,282✔
1108
    }
1109
    case TDMT_MND_CREATE_STB: {
777,092✔
1110
      code = handleCreateTbExecRes(pRes->res, pCatalog);
777,092✔
1111
      break;
777,092✔
1112
    }
1113
    case TDMT_VND_SUBMIT: {
983,894,584✔
1114
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
983,894,584✔
1115

1116
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
983,916,188✔
1117
      break;
983,904,920✔
1118
    }
1119
    case TDMT_SCH_QUERY:
297,375,392✔
1120
    case TDMT_SCH_MERGE_QUERY: {
1121
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
297,375,392✔
1122
      break;
297,380,160✔
1123
    }
1124
    default:
2,720✔
1125
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
2,720✔
1126
               pRequest->type, pRequest->requestId);
UNCOV
1127
      code = TSDB_CODE_APP_ERROR;
×
1128
  }
1129

1130
  return code;
1,374,198,880✔
1131
}
1132

1133
static bool incompletaFileParsing(SNode* pStmt) {
1,433,638,392✔
1134
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
1,433,638,392✔
1135
}
1136

UNCOV
1137
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1138
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1139

UNCOV
1140
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1141
  if (TSDB_CODE_SUCCESS == code) {
×
1142
    int64_t analyseStart = taosGetTimestampUs();
×
1143
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1144
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1145
  }
1146

UNCOV
1147
  if (TSDB_CODE_SUCCESS == code) {
×
1148
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1149
  }
1150

UNCOV
1151
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1152
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1153
}
×
1154

1155
void returnToUser(SRequestObj* pRequest) {
141,165,732✔
1156
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
141,165,732✔
1157
    // return to client
1158
    doRequestCallback(pRequest, pRequest->code);
141,165,732✔
1159
    return;
141,162,810✔
1160
  }
1161

UNCOV
1162
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1163
  if (pUserReq) {
×
1164
    pUserReq->code = pRequest->code;
×
1165
    // return to client
UNCOV
1166
    doRequestCallback(pUserReq, pUserReq->code);
×
1167
    (void)releaseRequest(pRequest->relation.userRefId);
×
1168
    return;
×
1169
  } else {
UNCOV
1170
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1171
             pRequest->relation.userRefId, pRequest->requestId);
1172
  }
1173
}
1174

UNCOV
1175
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1176
  int64_t     lastTs = 0;
×
1177
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1178
  int32_t     numOfFields = taos_num_fields(pRes);
×
1179

UNCOV
1180
  int32_t code = createDataBlock(pBlock);
×
1181
  if (code) {
×
1182
    return code;
×
1183
  }
1184

UNCOV
1185
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1186
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1187
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1188
    if (TSDB_CODE_SUCCESS != code) {
×
1189
      blockDataDestroy(*pBlock);
×
1190
      return code;
×
1191
    }
1192
  }
1193

UNCOV
1194
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1195
  if (TSDB_CODE_SUCCESS != code) {
×
1196
    blockDataDestroy(*pBlock);
×
1197
    return code;
×
1198
  }
1199

UNCOV
1200
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1201
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1202
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1203
      tscError("invalid data from vnode");
×
1204
      blockDataDestroy(*pBlock);
×
1205
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1206
    }
UNCOV
1207
    int64_t ts = *(int64_t*)pRow[0];
×
1208
    if (lastTs < ts) {
×
1209
      lastTs = ts;
×
1210
    }
1211

UNCOV
1212
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1213
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1214
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1215
      if (TSDB_CODE_SUCCESS != code) {
×
1216
        blockDataDestroy(*pBlock);
×
1217
        return code;
×
1218
      }
1219
    }
1220

UNCOV
1221
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1222
            *(int64_t*)pRow[2]);
1223
  }
1224

UNCOV
1225
  (*pBlock)->info.window.ekey = lastTs;
×
1226
  (*pBlock)->info.rows = numOfRows;
×
1227

UNCOV
1228
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1229
  return TSDB_CODE_SUCCESS;
×
1230
}
1231

UNCOV
1232
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1233
  SRequestObj* pRequest = (SRequestObj*)res;
×
1234
  if (pRequest->code) {
×
1235
    returnToUser(pRequest);
×
1236
    return;
×
1237
  }
1238

UNCOV
1239
  SSDataBlock* pBlock = NULL;
×
1240
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1241
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1242
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1243
             tstrerror(pRequest->code));
UNCOV
1244
    returnToUser(pRequest);
×
1245
    return;
×
1246
  }
1247

UNCOV
1248
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1249
  if (pNextReq) {
×
1250
    continuePostSubQuery(pNextReq, pBlock);
×
1251
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1252
  } else {
UNCOV
1253
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1254
             pRequest->relation.nextRefId, pRequest->requestId);
1255
  }
1256

UNCOV
1257
  blockDataDestroy(pBlock);
×
1258
}
1259

UNCOV
1260
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1261
  SRequestObj* pRequest = pWrapper->pRequest;
×
1262
  if (TD_RES_QUERY(pRequest)) {
×
1263
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1264
    return;
×
1265
  }
1266

UNCOV
1267
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1268
  if (pNextReq) {
×
1269
    continuePostSubQuery(pNextReq, NULL);
×
1270
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1271
  } else {
UNCOV
1272
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1273
             pRequest->relation.nextRefId, pRequest->requestId);
1274
  }
1275
}
1276

1277
// todo refacto the error code  mgmt
1278
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
1,457,939,428✔
1279
  SSqlCallbackWrapper* pWrapper = param;
1,457,939,428✔
1280
  SRequestObj*         pRequest = pWrapper->pRequest;
1,457,939,428✔
1281
  STscObj*             pTscObj = pRequest->pTscObj;
1,457,934,322✔
1282

1283
  pRequest->code = code;
1,457,958,604✔
1284
  if (pResult) {
1,457,958,722✔
1285
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
1,457,797,272✔
1286
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
1,457,795,462✔
1287
  }
1288

1289
  int32_t type = pRequest->type;
1,457,905,278✔
1290
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
1,457,876,686✔
1291
    if (pResult) {
1,059,681,164✔
1292
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
1,059,596,928✔
1293

1294
      // record the insert rows
1295
      if (TDMT_VND_SUBMIT == type) {
1,059,612,714✔
1296
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
968,637,542✔
1297
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
968,645,066✔
1298
      }
1299
    }
1300
    schedulerFreeJob(&pRequest->body.queryJob, 0);
1,059,699,174✔
1301
  }
1302

1303
  taosMemoryFree(pResult);
1,457,954,226✔
1304
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
1,457,932,256✔
1305
           pRequest->requestId);
1306

1307
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
1,457,903,740✔
1308
      pRequest->stmtBindVersion == 0) {
189,150✔
1309
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
188,958✔
1310
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1311
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
188,958✔
UNCOV
1312
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1313
    }
1314
    restartAsyncQuery(pRequest, code);
188,958✔
1315
    return;
188,958✔
1316
  }
1317

1318
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
1,457,714,782✔
1319
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
1,457,714,782✔
1320
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
6,532,350✔
UNCOV
1321
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1322
    }
1323
  }
1324

1325
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
1,457,749,456✔
1326
  int32_t code1 = handleQueryExecRsp(pRequest);
1,457,750,080✔
1327
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
1,457,753,780✔
UNCOV
1328
    pRequest->code = code1;
×
1329
  }
1330

1331
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
2,147,483,647✔
1332
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
1,433,629,246✔
1333
    continueInsertFromCsv(pWrapper, pRequest);
24,364✔
1334
    return;
24,038✔
1335
  }
1336

1337
  if (pRequest->relation.nextRefId) {
1,457,765,866✔
UNCOV
1338
    handlePostSubQuery(pWrapper);
×
1339
  } else {
1340
    destorySqlCallbackWrapper(pWrapper);
1,457,747,572✔
1341
    pRequest->pWrapper = NULL;
1,457,698,492✔
1342

1343
    // return to client
1344
    doRequestCallback(pRequest, code);
1,457,718,992✔
1345
  }
1346
}
1347

1348
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
17,013,104✔
1349
  int32_t code = 0;
17,013,104✔
1350
  int32_t subplanNum = 0;
17,013,104✔
1351

1352
  if (pQuery->pRoot) {
17,013,104✔
1353
    pRequest->stmtType = pQuery->pRoot->type;
16,202,288✔
1354
    if (nodeType(pQuery->pRoot) == QUERY_NODE_DELETE_STMT) {
16,189,618✔
NEW
1355
      pRequest->secureDelete = ((SDeleteStmt*)pQuery->pRoot)->secureDelete;
×
1356
    }
1357
  }
1358

1359
  if (pQuery->pRoot && !pRequest->inRetry) {
17,012,130✔
1360
    STscObj*            pTscObj = pRequest->pTscObj;
16,193,968✔
1361
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
16,195,942✔
1362
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
16,202,320✔
1363
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
16,173,912✔
1364
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
24,768✔
1365
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
24,784✔
1366
    }
1367
  }
1368

1369
  pRequest->body.execMode = pQuery->execMode;
17,035,850✔
1370
  switch (pQuery->execMode) {
17,035,946✔
UNCOV
1371
    case QUERY_EXEC_MODE_LOCAL:
×
UNCOV
1372
      if (!pRequest->validateOnly) {
×
UNCOV
1373
        if (NULL == pQuery->pRoot) {
×
UNCOV
1374
          terrno = TSDB_CODE_INVALID_PARA;
×
1375
          code = terrno;
×
1376
        } else {
1377
          code = execLocalCmd(pRequest, pQuery);
×
1378
        }
1379
      }
UNCOV
1380
      break;
×
1381
    case QUERY_EXEC_MODE_RPC:
824,946✔
1382
      if (!pRequest->validateOnly) {
824,946✔
1383
        code = execDdlQuery(pRequest, pQuery);
824,946✔
1384
      }
1385
      break;
824,946✔
1386
    case QUERY_EXEC_MODE_SCHEDULE: {
16,185,814✔
1387
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
16,185,814✔
1388
      if (NULL == pMnodeList) {
16,183,456✔
UNCOV
1389
        code = terrno;
×
UNCOV
1390
        break;
×
1391
      }
1392
      SQueryPlan* pDag = NULL;
16,183,456✔
1393
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
16,184,952✔
1394
      if (TSDB_CODE_SUCCESS == code) {
16,191,300✔
1395
        pRequest->body.subplanNum = pDag->numOfSubplans;
16,198,616✔
1396
        if (!pRequest->validateOnly) {
16,182,904✔
1397
          SArray* pNodeList = NULL;
16,170,496✔
1398
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
16,172,610✔
1399

1400
          if (TSDB_CODE_SUCCESS == code) {
16,191,560✔
1401
            code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
16,198,102✔
1402
                                        taosArrayGetSize(pNodeList));
16,196,762✔
1403
          }
1404

1405
          if (TSDB_CODE_SUCCESS == code) {
16,184,744✔
1406
            code = scheduleQuery(pRequest, pDag, pNodeList);
16,184,744✔
1407
          }
1408
          taosArrayDestroy(pNodeList);
16,202,824✔
1409
        }
1410
      }
1411
      taosArrayDestroy(pMnodeList);
16,202,416✔
1412
      break;
16,206,144✔
1413
    }
UNCOV
1414
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
UNCOV
1415
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
UNCOV
1416
      break;
×
UNCOV
1417
    default:
×
1418
      break;
×
1419
  }
1420

1421
  if (!keepQuery) {
17,031,536✔
1422
    qDestroyQuery(pQuery);
×
1423
  }
1424

1425
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
17,031,536✔
1426
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
49,702✔
1427
    if (TSDB_CODE_SUCCESS != ret) {
49,702✔
UNCOV
1428
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1429
               pRequest->requestId);
1430
    }
1431
  }
1432

1433
  if (TSDB_CODE_SUCCESS == code) {
17,030,410✔
1434
    code = handleQueryExecRsp(pRequest);
17,028,174✔
1435
  }
1436

1437
  if (TSDB_CODE_SUCCESS != code) {
17,037,766✔
1438
    pRequest->code = code;
11,906✔
1439
  }
1440

1441
  if (res) {
17,037,766✔
UNCOV
1442
    *res = pRequest->body.resInfo.execRes.res;
×
UNCOV
1443
    pRequest->body.resInfo.execRes.res = NULL;
×
1444
  }
1445
}
17,037,766✔
1446

1447
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
1,458,713,984✔
1448
                                 SSqlCallbackWrapper* pWrapper) {
1449
  int32_t code = TSDB_CODE_SUCCESS;
1,458,713,984✔
1450
  pRequest->type = pQuery->msgType;
1,458,713,984✔
1451
  SArray*     pMnodeList = NULL;
1,458,683,816✔
1452
  SArray*     pNodeList = NULL;
1,458,683,816✔
1453
  SQueryPlan* pDag = NULL;
1,458,615,288✔
1454
  int64_t     st = taosGetTimestampUs();
1,458,697,688✔
1455

1456
  if (!pRequest->parseOnly) {
1,458,697,688✔
1457
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,458,693,552✔
1458
    if (NULL == pMnodeList) {
1,458,715,710✔
UNCOV
1459
      code = terrno;
×
1460
    }
1461
    SPlanContext cxt = {.queryId = pRequest->requestId,
1,622,642,842✔
1462
                        .acctId = pRequest->pTscObj->acctId,
1,458,778,614✔
1463
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
1,458,780,712✔
1464
                        .pAstRoot = pQuery->pRoot,
1,458,811,978✔
1465
                        .showRewrite = pQuery->showRewrite,
1,458,817,076✔
1466
                        .isView = pWrapper->pParseCtx->isView,
1,458,793,414✔
1467
                        .isAudit = pWrapper->pParseCtx->isAudit,
1,458,747,964✔
1468
                        .privInfo = pWrapper->pParseCtx->privInfo,
1,458,729,144✔
1469
                        .pMsg = pRequest->msgBuf,
1,458,757,068✔
1470
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1471
                        .pUser = pRequest->pTscObj->user,
1,458,750,402✔
1472
                        .userId = pRequest->pTscObj->userId,
1,458,708,484✔
1473
                        .sysInfo = pRequest->pTscObj->sysInfo,
1,458,751,470✔
1474
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
1,458,721,182✔
1475
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
1,458,747,028✔
1476
    if (TSDB_CODE_SUCCESS == code) {
1,458,738,046✔
1477
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
1,458,767,602✔
1478
    }
1479
    if (code) {
1,458,758,696✔
1480
      tscError("req:0x%" PRIx64 " requestId:0x%" PRIx64 ", failed to create query plan, code:%s msg:%s", pRequest->self,
483,306✔
1481
               pRequest->requestId, tstrerror(code), cxt.pMsg);
1482
    } else {
1483
      pRequest->body.subplanNum = pDag->numOfSubplans;
1,458,275,390✔
1484
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
1,458,275,760✔
1485
    }
1486
  }
1487

1488
  pRequest->metric.execStart = taosGetTimestampUs();
1,458,763,070✔
1489
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
1,458,731,166✔
1490

1491
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
1,458,669,340✔
1492
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
1,457,868,184✔
1493
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
374,326,844✔
1494
    }
1495

1496
    if (code == TSDB_CODE_SUCCESS) {
1,457,841,352✔
1497
      code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
1,457,778,018✔
1498
                                  taosArrayGetSize(pNodeList));
1,457,814,976✔
1499
    }
1500

1501
    if (code == TSDB_CODE_SUCCESS) {
1,457,877,648✔
1502
      SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
1,457,871,778✔
1503
                               .requestId = pRequest->requestId,
1,457,908,244✔
1504
                               .requestObjRefId = pRequest->self};
1,457,907,332✔
1505
      SSchedulerReq    req = {
1,539,756,178✔
1506
             .syncReq       = false,
1507
             .localReq      = (tsQueryPolicy == QUERY_POLICY_CLIENT),
1,457,870,478✔
1508
             .pConn         = &conn,
1509
             .pNodeList     = pNodeList,
1510
             .pDag          = pDag,
1511
             .allocatorRefId = pRequest->allocatorRefId,
1,457,870,478✔
1512
             .sql           = pRequest->sqlstr,
1,457,790,976✔
1513
             .startTs       = pRequest->metric.start,
1,457,828,932✔
1514
             .execFp        = schedulerExecCb,
1515
             .cbParam       = pWrapper,
1516
             .chkKillFp     = chkRequestKilled,
1517
             .chkKillParam  = (void*)pRequest->self,
1,457,796,068✔
1518
             .pExecRes      = NULL,
1519
             .source        = pRequest->source,
1,457,864,114✔
1520
             .secureDelete  = pRequest->secureDelete,
1,457,849,794✔
1521
             .pWorkerCb     = getTaskPoolWorkerCb(),
1,457,798,054✔
1522
      };
1523

1524
      if (TSDB_CODE_SUCCESS == code) {
1,457,865,250✔
1525
        code = schedulerExecJob(&req, &pRequest->body.queryJob);
1,457,942,202✔
1526
      }
1527
      taosArrayDestroy(pNodeList);
1,457,874,868✔
1528
      taosArrayDestroy(pMnodeList);
1,457,960,752✔
1529
      return code;
1,457,926,510✔
1530
    }
1531
  }
1532

1533
  qDestroyQueryPlan(pDag);
874,032✔
1534
  tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
857,880✔
1535
           pRequest->requestId);
1536
  destorySqlCallbackWrapper(pWrapper);
857,880✔
1537
  pRequest->pWrapper = NULL;
857,880✔
1538
  if (TSDB_CODE_SUCCESS != code) {
857,880✔
1539
    pRequest->code = code;
489,176✔
1540
  }
1541

1542
  doRequestCallback(pRequest, code);
857,880✔
1543

1544
  // todo not to be released here
1545
  taosArrayDestroy(pMnodeList);
857,880✔
1546
  taosArrayDestroy(pNodeList);
857,880✔
1547

1548
  return code;
852,976✔
1549
}
1550

1551
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
1,666,909,884✔
1552
  int32_t code = 0;
1,666,909,884✔
1553

1554
  if (pRequest->parseOnly) {
1,666,909,884✔
1555
    doRequestCallback(pRequest, 0);
577,554✔
1556
    return;
577,554✔
1557
  }
1558

1559
  pRequest->body.execMode = pQuery->execMode;
1,666,346,128✔
1560
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
1,666,319,648✔
1561
    destorySqlCallbackWrapper(pWrapper);
207,628,376✔
1562
    pRequest->pWrapper = NULL;
207,629,038✔
1563
  }
1564

1565
  if (pQuery->pRoot && !pRequest->inRetry) {
1,666,263,194✔
1566
    STscObj*            pTscObj = pRequest->pTscObj;
1,666,349,978✔
1567
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,666,334,138✔
1568
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
1,666,370,054✔
1569
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
1,083,580,154✔
1570
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
968,141,542✔
1571
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
698,192,512✔
1572
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
278,301,122✔
1573
    }
1574
  }
1575

1576
  switch (pQuery->execMode) {
1,666,322,776✔
1577
    case QUERY_EXEC_MODE_LOCAL:
11,344,650✔
1578
      asyncExecLocalCmd(pRequest, pQuery);
11,344,650✔
1579
      break;
11,344,650✔
1580
    case QUERY_EXEC_MODE_RPC:
195,118,636✔
1581
      code = asyncExecDdlQuery(pRequest, pQuery);
195,118,636✔
1582
      break;
195,122,984✔
1583
    case QUERY_EXEC_MODE_SCHEDULE: {
1,458,699,282✔
1584
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
1,458,699,282✔
1585
      break;
1,458,796,882✔
1586
    }
1587
    case QUERY_EXEC_MODE_EMPTY_RESULT:
1,156,812✔
1588
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
1,156,812✔
1589
      doRequestCallback(pRequest, 0);
1,156,812✔
1590
      break;
1,156,812✔
UNCOV
1591
    default:
×
UNCOV
1592
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
UNCOV
1593
      doRequestCallback(pRequest, -1);
×
UNCOV
1594
      break;
×
1595
  }
1596
}
1597

1598
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
27,338✔
1599
  SCatalog* pCatalog = NULL;
27,338✔
1600
  int32_t   code = 0;
27,338✔
1601
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
27,338✔
1602
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
27,338✔
1603

1604
  if (dbNum <= 0 && tblNum <= 0) {
27,338✔
1605
    return TSDB_CODE_APP_ERROR;
27,338✔
1606
  }
1607

UNCOV
1608
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
UNCOV
1609
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1610
    return code;
×
1611
  }
1612

1613
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1614
                           .requestId = pRequest->requestId,
×
1615
                           .requestObjRefId = pRequest->self,
×
UNCOV
1616
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1617

1618
  for (int32_t i = 0; i < dbNum; ++i) {
×
1619
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1620

1621
    // catalogRefreshDBVgInfo will handle dbFName == null.
UNCOV
1622
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
1623
    if (code != TSDB_CODE_SUCCESS) {
×
1624
      return code;
×
1625
    }
1626
  }
1627

1628
  for (int32_t i = 0; i < tblNum; ++i) {
×
1629
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1630

1631
    // catalogRefreshTableMeta will handle tableName == null.
UNCOV
1632
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
1633
    if (code != TSDB_CODE_SUCCESS) {
×
1634
      return code;
×
1635
    }
1636
  }
1637

1638
  return code;
×
1639
}
1640

1641
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
9,222,438✔
1642
  SCatalog* pCatalog = NULL;
9,222,438✔
1643
  int32_t   tbNum = taosArrayGetSize(tbList);
9,222,438✔
1644
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
9,222,438✔
1645
  if (code != TSDB_CODE_SUCCESS) {
9,222,438✔
UNCOV
1646
    return code;
×
1647
  }
1648

1649
  if (isView) {
9,222,438✔
1650
    for (int32_t i = 0; i < tbNum; ++i) {
1,354,744✔
1651
      SName* pViewName = taosArrayGet(tbList, i);
677,372✔
1652
      char   dbFName[TSDB_DB_FNAME_LEN];
670,566✔
1653
      if (NULL == pViewName) {
677,372✔
UNCOV
1654
        continue;
×
1655
      }
1656
      (void)tNameGetFullDbName(pViewName, dbFName);
677,372✔
1657
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
677,372✔
1658
    }
1659
  } else {
1660
    for (int32_t i = 0; i < tbNum; ++i) {
13,282,274✔
1661
      SName* pTbName = taosArrayGet(tbList, i);
4,737,208✔
1662
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
4,737,208✔
1663
    }
1664
  }
1665

1666
  return TSDB_CODE_SUCCESS;
9,222,438✔
1667
}
1668

1669
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
167,975,660✔
1670
  pEpSet->version = 0;
167,975,660✔
1671

1672
  // init mnode ip set
1673
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
167,977,050✔
1674
  mgmtEpSet->numOfEps = 0;
167,976,620✔
1675
  mgmtEpSet->inUse = 0;
167,976,642✔
1676

1677
  if (firstEp && firstEp[0] != 0) {
167,976,214✔
1678
    if (strlen(firstEp) >= TSDB_EP_LEN) {
167,978,346✔
UNCOV
1679
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1680
      return -1;
×
1681
    }
1682

1683
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
167,978,346✔
1684
    if (code != TSDB_CODE_SUCCESS) {
167,977,168✔
1685
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1686
      return terrno;
×
1687
    }
1688
    // uint32_t addr = 0;
1689
    SIpAddr addr = {0};
167,977,168✔
1690
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
167,976,128✔
1691
    if (code) {
167,978,168✔
1692
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
250✔
1693
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1694
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
278✔
1695
    } else {
1696
      mgmtEpSet->numOfEps++;
167,979,318✔
1697
    }
1698
  }
1699

1700
  if (secondEp && secondEp[0] != 0) {
167,975,174✔
1701
    if (strlen(secondEp) >= TSDB_EP_LEN) {
3,603,262✔
UNCOV
1702
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1703
      return terrno;
×
1704
    }
1705

1706
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
3,603,262✔
1707
    if (code != TSDB_CODE_SUCCESS) {
3,603,908✔
1708
      return code;
×
1709
    }
1710
    SIpAddr addr = {0};
3,603,908✔
1711
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
3,603,908✔
1712
    if (code) {
3,603,824✔
1713
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1714
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
UNCOV
1715
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1716
    } else {
1717
      mgmtEpSet->numOfEps++;
3,603,824✔
1718
    }
1719
  }
1720

1721
  if (mgmtEpSet->numOfEps == 0) {
167,975,306✔
1722
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
278✔
1723
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
278✔
1724
  }
1725

1726
  return 0;
167,975,064✔
1727
}
1728

1729
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
167,979,490✔
1730
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1731
  *pTscObj = NULL;
167,979,490✔
1732
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
167,979,490✔
1733
  if (TSDB_CODE_SUCCESS != code) {
167,981,426✔
UNCOV
1734
    return code;
×
1735
  }
1736

1737
  SRequestObj* pRequest = NULL;
167,981,426✔
1738
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
167,981,426✔
1739
  if (TSDB_CODE_SUCCESS != code) {
167,980,828✔
UNCOV
1740
    destroyTscObj(*pTscObj);
×
UNCOV
1741
    return code;
×
1742
  }
1743

1744
  pRequest->sqlstr = taosStrdup("taos_connect");
167,980,828✔
1745
  if (pRequest->sqlstr) {
167,981,002✔
1746
    pRequest->sqlLen = strlen(pRequest->sqlstr);
167,981,066✔
1747
  } else {
UNCOV
1748
    return terrno;
×
1749
  }
1750

1751
  SMsgSendInfo* body = NULL;
167,981,066✔
1752
  code = buildConnectMsg(pRequest, &body, totpCode);
167,981,066✔
1753
  if (TSDB_CODE_SUCCESS != code) {
167,966,776✔
UNCOV
1754
    destroyTscObj(*pTscObj);
×
UNCOV
1755
    return code;
×
1756
  }
1757

1758
  // int64_t transporterId = 0;
1759
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
167,966,776✔
1760
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
167,980,512✔
1761
  if (TSDB_CODE_SUCCESS != code) {
167,976,598✔
UNCOV
1762
    destroyTscObj(*pTscObj);
×
UNCOV
1763
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
UNCOV
1764
    return code;
×
1765
  }
1766
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
167,976,598✔
1767
    destroyTscObj(*pTscObj);
×
1768
    tscError("failed to wait sem, code:%s", terrstr());
×
1769
    return terrno;
×
1770
  }
1771
  if (pRequest->code != TSDB_CODE_SUCCESS) {
167,977,380✔
1772
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
27,522✔
1773
    tscError("failed to connect to server, reason: %s", errorMsg);
27,522✔
1774

1775
    terrno = pRequest->code;
27,522✔
1776
    destroyRequest(pRequest);
27,522✔
1777
    taos_close_internal(*pTscObj);
27,522✔
1778
    *pTscObj = NULL;
27,522✔
1779
    return terrno;
27,522✔
1780
  }
1781
  if (connType == CONN_TYPE__AUTH_TEST) {
167,949,902✔
1782
    terrno = TSDB_CODE_SUCCESS;
190✔
1783
    destroyRequest(pRequest);
190✔
1784
    taos_close_internal(*pTscObj);
190✔
1785
    *pTscObj = NULL;
5,308✔
1786
    return TSDB_CODE_SUCCESS;
5,308✔
1787
  }
1788

1789
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
167,949,712✔
1790
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1791
  destroyRequest(pRequest);
167,954,550✔
1792
  return code;
167,937,586✔
1793
}
1794

1795
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
167,977,966✔
1796
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
167,977,966✔
1797
  if (*pMsgSendInfo == NULL) {
167,979,502✔
UNCOV
1798
    return terrno;
×
1799
  }
1800

1801
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
167,979,476✔
1802

1803
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
167,979,502✔
1804
  (*pMsgSendInfo)->requestId = pRequest->requestId;
167,979,412✔
1805
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
167,979,476✔
1806
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
167,980,362✔
1807
  if (NULL == (*pMsgSendInfo)->param) {
167,977,718✔
UNCOV
1808
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1809
    return terrno;
×
1810
  }
1811

1812
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
167,976,550✔
1813

1814
  SConnectReq connectReq = {0};
167,976,640✔
1815
  STscObj*    pObj = pRequest->pTscObj;
167,976,550✔
1816

1817
  char* db = getDbOfConnection(pObj);
167,976,418✔
1818
  if (db != NULL) {
167,981,468✔
1819
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,036,470✔
1820
  } else if (terrno) {
166,944,998✔
UNCOV
1821
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1822
    return terrno;
×
1823
  }
1824
  taosMemoryFreeClear(db);
167,979,062✔
1825

1826
  connectReq.connType = pObj->connType;
167,980,214✔
1827
  connectReq.pid = appInfo.pid;
167,979,136✔
1828
  connectReq.startTime = appInfo.startTime;
167,977,904✔
1829
  connectReq.totpCode = totpCode;
167,977,904✔
1830
  connectReq.connectTime = taosGetTimestampMs();
167,975,422✔
1831

1832
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
167,975,422✔
1833
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
167,975,422✔
1834
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
167,976,654✔
1835
  tstrncpy(connectReq.token, pObj->token, sizeof(connectReq.token));
167,976,702✔
1836
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
167,975,486✔
1837
  tSignConnectReq(&connectReq);
167,975,486✔
1838

1839
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
167,978,998✔
1840
  void*   pReq = taosMemoryMalloc(contLen);
167,974,242✔
1841
  if (NULL == pReq) {
167,977,454✔
UNCOV
1842
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1843
    return terrno;
×
1844
  }
1845

1846
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
167,977,454✔
1847
    taosMemoryFree(*pMsgSendInfo);
×
1848
    taosMemoryFree(pReq);
×
UNCOV
1849
    return terrno;
×
1850
  }
1851

1852
  (*pMsgSendInfo)->msgInfo.len = contLen;
167,973,650✔
1853
  (*pMsgSendInfo)->msgInfo.pData = pReq;
167,974,434✔
1854
  return TSDB_CODE_SUCCESS;
167,973,610✔
1855
}
1856

1857
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1858
  if (NULL == pEpSet) {
2,147,483,647✔
1859
    return;
2,147,483,647✔
1860
  }
1861

1862
  switch (pSendInfo->target.type) {
8,019,578✔
UNCOV
1863
    case TARGET_TYPE_MNODE:
×
UNCOV
1864
      if (NULL == pTscObj) {
×
UNCOV
1865
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1866
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1867
        return;
960✔
1868
      }
1869

1870
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
×
UNCOV
1871
      SEpSet* pOrig = &originEpset;
×
UNCOV
1872
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
×
UNCOV
1873
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
×
UNCOV
1874
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
×
1875
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1876
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
×
1877
      break;
1,076,762✔
1878
    case TARGET_TYPE_VNODE: {
7,602,566✔
1879
      if (NULL == pTscObj) {
7,602,566✔
1880
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
960✔
1881
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1882
        return;
960✔
1883
      }
1884

1885
      SCatalog* pCatalog = NULL;
7,601,606✔
1886
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
7,601,606✔
1887
      if (code != TSDB_CODE_SUCCESS) {
7,601,518✔
UNCOV
1888
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1889
                 tstrerror(code));
UNCOV
1890
        return;
×
1891
      }
1892

1893
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
7,601,518✔
1894
      if (code != TSDB_CODE_SUCCESS) {
7,601,556✔
1895
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1896
                 tstrerror(code));
UNCOV
1897
        return;
×
1898
      }
1899
      taosMemoryFreeClear(pSendInfo->target.dbFName);
7,601,556✔
1900
      break;
7,601,644✔
1901
    }
1902
    default:
429,506✔
1903
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
429,506✔
1904
      break;
430,972✔
1905
  }
1906
}
1907

1908
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1909
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
2,147,483,647✔
1910
  if (pMsg->info.ahandle == NULL) {
2,147,483,647✔
1911
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
1,130,452✔
1912
    rpcFreeCont(pMsg->pCont);
1,130,452✔
1913
    taosMemoryFree(pEpSet);
1,130,452✔
1914
    return TSDB_CODE_TSC_INTERNAL_ERROR;
1,130,452✔
1915
  }
1916

1917
  STscObj* pTscObj = NULL;
2,147,483,647✔
1918

1919
  STraceId* trace = &pMsg->info.traceId;
2,147,483,647✔
1920
  char      tbuf[40] = {0};
2,147,483,647✔
1921
  TRACE_TO_STR(trace, tbuf);
2,147,483,647✔
1922

1923
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
2,147,483,647✔
1924
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1925

1926
  if (pSendInfo->requestObjRefId != 0) {
2,147,483,647✔
1927
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
2,147,483,647✔
1928
    if (pRequest) {
2,147,483,647✔
1929
      if (pRequest->self != pSendInfo->requestObjRefId) {
2,147,483,647✔
UNCOV
1930
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1931
                 pSendInfo->requestObjRefId);
1932

UNCOV
1933
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
UNCOV
1934
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1935
        }
UNCOV
1936
        rpcFreeCont(pMsg->pCont);
×
UNCOV
1937
        taosMemoryFree(pEpSet);
×
1938
        destroySendMsgInfo(pSendInfo);
×
1939
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1940
      }
1941
      pTscObj = pRequest->pTscObj;
2,147,483,647✔
1942
    }
1943
  }
1944

1945
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
2,147,483,647✔
1946

1947
  SDataBuf buf = {.msgType = pMsg->msgType,
2,147,483,647✔
1948
                  .len = pMsg->contLen,
2,147,483,647✔
1949
                  .pData = NULL,
1950
                  .handle = pMsg->info.handle,
2,147,483,647✔
1951
                  .handleRefId = pMsg->info.refId,
2,147,483,647✔
1952
                  .pEpSet = pEpSet};
1953

1954
  if (pMsg->contLen > 0) {
2,147,483,647✔
1955
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
2,147,483,647✔
1956
    if (buf.pData == NULL) {
2,147,483,647✔
UNCOV
1957
      pMsg->code = terrno;
×
1958
    } else {
1959
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
2,147,483,647✔
1960
    }
1961
  }
1962

1963
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
2,147,483,647✔
1964

1965
  if (pTscObj) {
2,147,483,647✔
1966
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
2,147,483,647✔
1967
    if (TSDB_CODE_SUCCESS != code) {
2,147,483,647✔
1968
      tscError("doProcessMsgFromServer taosReleaseRef failed");
130✔
1969
      terrno = code;
130✔
1970
      pMsg->code = code;
130✔
1971
    }
1972
  }
1973

1974
  rpcFreeCont(pMsg->pCont);
2,147,483,647✔
1975
  destroySendMsgInfo(pSendInfo);
2,147,483,647✔
1976
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
1977
}
1978

1979
int32_t doProcessMsgFromServer(void* param) {
2,147,483,647✔
1980
  AsyncArg* arg = (AsyncArg*)param;
2,147,483,647✔
1981
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
2,147,483,647✔
1982
  taosMemoryFree(arg);
2,147,483,647✔
1983
  return code;
2,147,483,647✔
1984
}
1985

1986
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1987
  int32_t code = 0;
2,147,483,647✔
1988
  SEpSet* tEpSet = NULL;
2,147,483,647✔
1989

1990
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
2,147,483,647✔
1991

1992
  if (pEpSet != NULL) {
2,147,483,647✔
1993
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
8,033,316✔
1994
    if (NULL == tEpSet) {
8,033,158✔
UNCOV
1995
      code = terrno;
×
UNCOV
1996
      pMsg->code = terrno;
×
UNCOV
1997
      goto _exit;
×
1998
    }
1999
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
8,033,158✔
2000
  }
2001

2002
  // pMsg is response msg
2003
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
2,147,483,647✔
2004
    // restore origin code
2005
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
167,860,058✔
UNCOV
2006
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
2007
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
167,860,058✔
UNCOV
2008
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
2009
    }
2010
  } else {
2011
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
2012
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
2,147,483,647✔
2013
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
2014
    }
2015
  }
2016

2017
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
2,147,483,647✔
2018
  if (NULL == arg) {
2,147,483,647✔
UNCOV
2019
    code = terrno;
×
UNCOV
2020
    pMsg->code = code;
×
UNCOV
2021
    goto _exit;
×
2022
  }
2023

2024
  arg->msg = *pMsg;
2,147,483,647✔
2025
  arg->pEpset = tEpSet;
2,147,483,647✔
2026

2027
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
2,147,483,647✔
UNCOV
2028
    pMsg->code = code;
×
UNCOV
2029
    taosMemoryFree(arg);
×
UNCOV
2030
    goto _exit;
×
2031
  }
2032
  return;
2,147,483,647✔
2033

2034
_exit:
×
2035
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
UNCOV
2036
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
UNCOV
2037
  if (code != 0) {
×
UNCOV
2038
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2039
  }
2040
}
2041

2042
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
4,278✔
2043
                        uint16_t port) {
2044
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
4,278✔
2045
  if (user == NULL) {
4,278✔
UNCOV
2046
    user = TSDB_DEFAULT_USER;
×
2047
  }
2048

2049
  if (pass == NULL) {
4,278✔
UNCOV
2050
    pass = TSDB_DEFAULT_PASS;
×
2051
  }
2052

2053
  STscObj* pObj = NULL;
4,278✔
2054
  int32_t  code = taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__QUERY, &pObj);
4,278✔
2055
  if (TSDB_CODE_SUCCESS == code) {
4,278✔
2056
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
2,828✔
2057
    if (NULL == rid) {
2,828✔
UNCOV
2058
      tscError("out of memory when taos_connect_totp to %s:%u, user:%s db:%s", ip, port, user, db);
×
UNCOV
2059
      return NULL;
×
2060
    }
2061
    *rid = pObj->id;
2,828✔
2062
    return (TAOS*)rid;
2,828✔
2063
  } else {
2064
    terrno = code;
1,450✔
2065
  }
2066

2067
  return NULL;
1,450✔
2068
}
2069

2070
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
380✔
2071
                      uint16_t port) {
2072
  tscInfo("try to test connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
380✔
2073
  if (user == NULL) {
380✔
UNCOV
2074
    user = TSDB_DEFAULT_USER;
×
2075
  }
2076

2077
  if (pass == NULL) {
380✔
UNCOV
2078
    pass = TSDB_DEFAULT_PASS;
×
2079
  }
2080

2081
  STscObj* pObj = NULL;
380✔
2082
  return taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
380✔
2083
}
2084

2085
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) {
4,832✔
2086
  tscInfo("try to connect to %s:%u by token, db:%s", ip, port, db);
4,832✔
2087

2088
  STscObj* pObj = NULL;
4,832✔
2089
  int32_t  code = taos_connect_by_auth(ip, NULL, token, NULL, db, port, CONN_TYPE__QUERY, &pObj);
4,832✔
2090
  if (TSDB_CODE_SUCCESS == code) {
4,832✔
2091
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
2,346✔
2092
    if (NULL == rid) {
2,346✔
UNCOV
2093
      tscError("out of memory when taos_connect_token to %s:%u db:%s", ip, port, db);
×
UNCOV
2094
      return NULL;
×
2095
    }
2096
    *rid = pObj->id;
2,346✔
2097
    return (TAOS*)rid;
2,346✔
2098
  } else {
2099
    terrno = code;
2,486✔
2100
  }
2101

2102
  return NULL;
2,486✔
2103
}
2104

2105
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
294✔
2106
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
294✔
2107
  if (user == NULL) {
294✔
UNCOV
2108
    user = TSDB_DEFAULT_USER;
×
2109
  }
2110

2111
  if (auth == NULL) {
294✔
UNCOV
2112
    tscError("No auth info is given, failed to connect to server");
×
2113
    return NULL;
×
2114
  }
2115

2116
  STscObj* pObj = NULL;
294✔
2117
  int32_t  code = taos_connect_by_auth(ip, user, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
294✔
2118
  if (TSDB_CODE_SUCCESS == code) {
294✔
2119
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
96✔
2120
    if (NULL == rid) {
96✔
UNCOV
2121
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2122
    }
2123
    *rid = pObj->id;
96✔
2124
    return (TAOS*)rid;
96✔
2125
  }
2126

2127
  return NULL;
198✔
2128
}
2129

2130
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
2131
//                      const char* db, int dbLen, uint16_t port) {
2132
//   char ipStr[TSDB_EP_LEN] = {0};
2133
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
2134
//   char userStr[TSDB_USER_LEN] = {0};
2135
//   char passStr[TSDB_PASSWORD_LEN] = {0};
2136
//
2137
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
2138
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
2139
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
2140
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
2141
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
2142
// }
2143

2144
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2145
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2146
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2147

2148
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2149
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2150

2151
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2152
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2153
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2154

2155
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
2156
          pResultInfo->length[i] = blobDataLen(pStart);
49,944✔
2157
          pResultInfo->row[i] = blobDataVal(pStart);
162✔
2158
        } else {
2159
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2160
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2161
        }
2162
      } else {
2163
        pResultInfo->row[i] = NULL;
535,205,348✔
2164
        pResultInfo->length[i] = 0;
535,232,420✔
2165
      }
2166
    } else {
2167
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2168
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2169
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2170
      } else {
2171
        pResultInfo->row[i] = NULL;
2,147,483,647✔
2172
        pResultInfo->length[i] = 0;
2,147,483,647✔
2173
      }
2174
    }
2175
  }
2176
}
2,147,483,647✔
2177

UNCOV
2178
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
UNCOV
2179
  if (pRequest == NULL) {
×
UNCOV
2180
    return NULL;
×
2181
  }
2182

2183
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2184
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2185
    // All data has returned to App already, no need to try again
UNCOV
2186
    if (pResultInfo->completed) {
×
UNCOV
2187
      pResultInfo->numOfRows = 0;
×
2188
      return NULL;
×
2189
    }
2190

2191
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2192
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2193

UNCOV
2194
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
UNCOV
2195
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2196
      pResultInfo->numOfRows = 0;
×
2197
      return NULL;
×
2198
    }
2199

2200
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2201
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2202
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
UNCOV
2203
      pResultInfo->numOfRows = 0;
×
UNCOV
2204
      return NULL;
×
2205
    }
2206

2207
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2208
             ", complete:%d, QID:0x%" PRIx64,
2209
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2210

UNCOV
2211
    STscObj*            pTscObj = pRequest->pTscObj;
×
2212
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
UNCOV
2213
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2214

UNCOV
2215
    if (pResultInfo->numOfRows == 0) {
×
2216
      return NULL;
×
2217
    }
2218
  }
2219

2220
  if (setupOneRowPtr) {
×
2221
    doSetOneRowPtr(pResultInfo);
×
UNCOV
2222
    pResultInfo->current += 1;
×
2223
  }
2224

2225
  return pResultInfo->row;
×
2226
}
2227

2228
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
405,128,748✔
2229
  tsem_t* sem = param;
405,128,748✔
2230
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
405,128,748✔
UNCOV
2231
    tscError("failed to post sem, code:%s", terrstr());
×
2232
  }
2233
}
405,128,828✔
2234

2235
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
2,147,483,647✔
2236
  if (pRequest == NULL) {
2,147,483,647✔
UNCOV
2237
    return NULL;
×
2238
  }
2239

2240
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
2,147,483,647✔
2241
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
2,147,483,647✔
2242
    // All data has returned to App already, no need to try again
2243
    if (pResultInfo->completed) {
587,578,580✔
2244
      pResultInfo->numOfRows = 0;
182,451,088✔
2245
      return NULL;
182,451,024✔
2246
    }
2247

2248
    // convert ucs4 to native multi-bytes string
2249
    pResultInfo->convertUcs4 = convertUcs4;
405,128,730✔
2250
    tsem_t sem;
403,709,476✔
2251
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
405,128,738✔
UNCOV
2252
      tscError("failed to init sem, code:%s", terrstr());
×
2253
    }
2254
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
405,128,784✔
2255
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
405,128,806✔
UNCOV
2256
      tscError("failed to wait sem, code:%s", terrstr());
×
2257
    }
2258
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
405,128,828✔
UNCOV
2259
      tscError("failed to destroy sem, code:%s", terrstr());
×
2260
    }
2261
    pRequest->inCallback = false;
405,128,828✔
2262
  }
2263

2264
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
2,147,483,647✔
2265
    return NULL;
21,682,838✔
2266
  } else {
2267
    if (setupOneRowPtr) {
2,147,483,647✔
2268
      doSetOneRowPtr(pResultInfo);
2,147,483,647✔
2269
      pResultInfo->current += 1;
2,147,483,647✔
2270
    }
2271

2272
    return pResultInfo->row;
2,147,483,647✔
2273
  }
2274
}
2275

2276
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
537,957,240✔
2277
  if (pResInfo->row == NULL) {
537,957,240✔
2278
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
329,592,276✔
2279
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
329,593,442✔
2280
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
329,589,268✔
2281
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
329,592,276✔
2282

2283
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
329,591,282✔
2284
      taosMemoryFree(pResInfo->row);
3,004✔
UNCOV
2285
      taosMemoryFree(pResInfo->pCol);
×
UNCOV
2286
      taosMemoryFree(pResInfo->length);
×
UNCOV
2287
      taosMemoryFree(pResInfo->convertBuf);
×
UNCOV
2288
      return terrno;
×
2289
    }
2290
  }
2291

2292
  return TSDB_CODE_SUCCESS;
537,958,380✔
2293
}
2294

2295
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
537,777,132✔
2296
  int32_t idx = -1;
537,777,132✔
2297
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
537,777,132✔
2298
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
537,771,934✔
2299

2300
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2301
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2302
    int32_t schemaBytes =
2303
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2304

2305
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
2,147,483,647✔
2306
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
176,429,372✔
2307
      if (p == NULL) {
176,429,372✔
UNCOV
2308
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
UNCOV
2309
        return terrno;
×
2310
      }
2311

2312
      pResultInfo->convertBuf[i] = p;
176,429,372✔
2313

2314
      SResultColumn* pCol = &pResultInfo->pCol[i];
176,429,372✔
2315
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2316
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2317
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2318

2319
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2320
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2321
            tscError(
128✔
2322
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2323
                "colLength[i]):%p",
2324
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2325
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
128✔
2326
            return TSDB_CODE_TSC_INTERNAL_ERROR;
128✔
2327
          }
2328

2329
          varDataSetLen(p, len);
2,147,483,647✔
2330
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2331
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2332
        }
2333
      }
2334

2335
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
176,429,218✔
2336
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
176,429,218✔
2337
    }
2338
  }
2339
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
537,778,266✔
2340
  return TSDB_CODE_SUCCESS;
537,777,396✔
2341
}
2342

2343
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
537,774,392✔
2344
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2345
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
2,147,483,647✔
2346
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
2,147,483,647✔
2347
    int32_t       type = pFieldE->type;
2,147,483,647✔
2348
    int32_t       bufLen = 0;
2,147,483,647✔
2349
    char*         p = NULL;
2,147,483,647✔
2350
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
2,147,483,647✔
2351
      continue;
2,147,483,647✔
2352
    } else {
2353
      bufLen = 64;
3,192,954✔
2354
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
3,192,954✔
2355
      pFieldE->bytes = bufLen;
3,192,954✔
2356
      pField->bytes = bufLen;
3,192,954✔
2357
    }
2358
    if (!p) return terrno;
3,192,954✔
2359
    pResultInfo->convertBuf[i] = p;
3,192,954✔
2360

2361
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,958,950,510✔
2362
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
1,955,757,556✔
2363
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
1,955,757,556✔
2364
      p += bufLen;
1,955,757,556✔
2365
      if (TSDB_CODE_SUCCESS != code) {
1,955,757,556✔
UNCOV
2366
        return code;
×
2367
      }
2368
    }
2369
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
3,192,954✔
2370
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
3,192,954✔
2371
  }
2372
  return 0;
537,776,912✔
2373
}
2374

2375
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
793,532✔
2376
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
1,586,304✔
2377
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
792,772✔
2378
}
2379

2380
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
396,766✔
2381
  char*   p = (char*)pResultInfo->pData;
396,766✔
2382
  int32_t blockVersion = *(int32_t*)p;
396,766✔
2383

2384
  int32_t numOfRows = pResultInfo->numOfRows;
396,766✔
2385
  int32_t numOfCols = pResultInfo->numOfCols;
396,766✔
2386

2387
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2388
  // length |
2389
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
396,766✔
2390
  if (numOfCols != cols) {
396,766✔
UNCOV
2391
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2392
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2393
  }
2394

2395
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
396,766✔
2396
  int32_t* colLength = (int32_t*)(p + len);
396,766✔
2397
  len += sizeof(int32_t) * numOfCols;
396,766✔
2398

2399
  char* pStart = p + len;
396,766✔
2400
  for (int32_t i = 0; i < numOfCols; ++i) {
1,690,734✔
2401
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
1,293,968✔
2402

2403
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
1,293,968✔
2404
      int32_t* offset = (int32_t*)pStart;
458,920✔
2405
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
458,920✔
2406
      len += lenTmp;
458,920✔
2407
      pStart += lenTmp;
458,920✔
2408

2409
      int32_t estimateColLen = 0;
458,920✔
2410
      for (int32_t j = 0; j < numOfRows; ++j) {
2,190,344✔
2411
        if (offset[j] == -1) {
1,731,424✔
2412
          continue;
96,416✔
2413
        }
2414
        char* data = offset[j] + pStart;
1,635,008✔
2415

2416
        int32_t jsonInnerType = *data;
1,635,008✔
2417
        char*   jsonInnerData = data + CHAR_BYTES;
1,635,008✔
2418
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1,635,008✔
2419
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
22,752✔
2420
        } else if (tTagIsJson(data)) {
1,612,256✔
2421
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
404,820✔
2422
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,207,436✔
2423
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
1,122,116✔
2424
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
85,320✔
2425
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
62,568✔
2426
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
22,752✔
2427
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
22,752✔
UNCOV
2428
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
UNCOV
2429
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2430
        } else {
UNCOV
2431
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
UNCOV
2432
          return -1;
×
2433
        }
2434
      }
2435
      len += TMAX(colLen, estimateColLen);
458,920✔
2436
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
835,048✔
2437
      int32_t lenTmp = numOfRows * sizeof(int32_t);
113,760✔
2438
      len += (lenTmp + colLen);
113,760✔
2439
      pStart += lenTmp;
113,760✔
2440
    } else {
2441
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
721,288✔
2442
      len += (lenTmp + colLen);
721,288✔
2443
      pStart += lenTmp;
721,288✔
2444
    }
2445
    pStart += colLen;
1,293,968✔
2446
  }
2447

2448
  // Ensure the complete structure of the block, including the blankfill field,
2449
  // even though it is not used on the client side.
2450
  len += sizeof(bool);
396,766✔
2451
  return len;
396,766✔
2452
}
2453

2454
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
537,959,610✔
2455
  int32_t numOfRows = pResultInfo->numOfRows;
537,959,610✔
2456
  int32_t numOfCols = pResultInfo->numOfCols;
537,959,022✔
2457
  bool    needConvert = false;
537,962,014✔
2458
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
2459
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,147,483,647✔
2460
      needConvert = true;
396,766✔
2461
      break;
396,766✔
2462
    }
2463
  }
2464

2465
  if (!needConvert) {
537,961,910✔
2466
    return TSDB_CODE_SUCCESS;
537,565,144✔
2467
  }
2468

2469
  tscDebug("start to convert form json format string");
396,766✔
2470

2471
  char*   p = (char*)pResultInfo->pData;
396,766✔
2472
  int32_t blockVersion = *(int32_t*)p;
396,766✔
2473
  int32_t dataLen = estimateJsonLen(pResultInfo);
396,766✔
2474
  if (dataLen <= 0) {
396,766✔
UNCOV
2475
    tscError("doConvertJson error: estimateJsonLen failed");
×
UNCOV
2476
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2477
  }
2478

2479
  taosMemoryFreeClear(pResultInfo->convertJson);
396,766✔
2480
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
396,766✔
2481
  if (pResultInfo->convertJson == NULL) return terrno;
396,766✔
2482
  char* p1 = pResultInfo->convertJson;
396,766✔
2483

2484
  int32_t totalLen = 0;
396,766✔
2485
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
396,766✔
2486
  if (numOfCols != cols) {
396,766✔
UNCOV
2487
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2488
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2489
  }
2490

2491
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
396,766✔
2492
  (void)memcpy(p1, p, len);
396,766✔
2493

2494
  p += len;
396,766✔
2495
  p1 += len;
396,766✔
2496
  totalLen += len;
396,766✔
2497

2498
  len = sizeof(int32_t) * numOfCols;
396,766✔
2499
  int32_t* colLength = (int32_t*)p;
396,766✔
2500
  int32_t* colLength1 = (int32_t*)p1;
396,766✔
2501
  (void)memcpy(p1, p, len);
396,766✔
2502
  p += len;
396,766✔
2503
  p1 += len;
396,766✔
2504
  totalLen += len;
396,766✔
2505

2506
  char* pStart = p;
396,766✔
2507
  char* pStart1 = p1;
396,766✔
2508
  for (int32_t i = 0; i < numOfCols; ++i) {
1,690,734✔
2509
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
1,293,968✔
2510
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
1,293,968✔
2511
    if (colLen >= dataLen) {
1,293,968✔
UNCOV
2512
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
UNCOV
2513
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2514
    }
2515
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
1,293,968✔
2516
      int32_t* offset = (int32_t*)pStart;
458,920✔
2517
      int32_t* offset1 = (int32_t*)pStart1;
458,920✔
2518
      len = numOfRows * sizeof(int32_t);
458,920✔
2519
      (void)memcpy(pStart1, pStart, len);
458,920✔
2520
      pStart += len;
458,920✔
2521
      pStart1 += len;
458,920✔
2522
      totalLen += len;
458,920✔
2523

2524
      len = 0;
458,920✔
2525
      for (int32_t j = 0; j < numOfRows; ++j) {
2,190,344✔
2526
        if (offset[j] == -1) {
1,731,424✔
2527
          continue;
96,416✔
2528
        }
2529
        char* data = offset[j] + pStart;
1,635,008✔
2530

2531
        int32_t jsonInnerType = *data;
1,635,008✔
2532
        char*   jsonInnerData = data + CHAR_BYTES;
1,635,008✔
2533
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
1,635,008✔
2534
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1,635,008✔
2535
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
22,752✔
2536
          varDataSetLen(dst, strlen(varDataVal(dst)));
22,752✔
2537
        } else if (tTagIsJson(data)) {
1,612,256✔
2538
          char* jsonString = NULL;
404,820✔
2539
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
404,820✔
2540
          if (jsonString == NULL) {
404,820✔
UNCOV
2541
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
UNCOV
2542
            return terrno;
×
2543
          }
2544
          STR_TO_VARSTR(dst, jsonString);
404,820✔
2545
          taosMemoryFree(jsonString);
404,820✔
2546
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,207,436✔
2547
          *(char*)varDataVal(dst) = '\"';
1,122,116✔
2548
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
1,122,116✔
2549
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
1,122,116✔
2550
                                         pResultInfo->charsetCxt);
2551
          if (length <= 0) {
1,122,116✔
2552
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
948✔
2553
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2554
            length = 0;
948✔
2555
          }
2556
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
1,122,116✔
2557
                                                 varDataVal(tmp), length);
2558
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
1,122,116✔
2559
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
1,122,116✔
2560
          tscError("value:%s.", varDataVal(dst));
1,122,116✔
2561
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
85,320✔
2562
          double jsonVd = *(double*)(jsonInnerData);
62,568✔
2563
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
62,568✔
2564
          varDataSetLen(dst, strlen(varDataVal(dst)));
62,568✔
2565
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
22,752✔
2566
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
22,752✔
2567
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
22,752✔
2568
          varDataSetLen(dst, strlen(varDataVal(dst)));
22,752✔
2569
        } else {
UNCOV
2570
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
UNCOV
2571
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2572
        }
2573

2574
        offset1[j] = len;
1,635,008✔
2575
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
1,635,008✔
2576
        len += varDataTLen(dst);
1,635,008✔
2577
      }
2578
      colLen1 = len;
458,920✔
2579
      totalLen += colLen1;
458,920✔
2580
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
458,920✔
2581
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
835,048✔
2582
      len = numOfRows * sizeof(int32_t);
113,760✔
2583
      (void)memcpy(pStart1, pStart, len);
113,760✔
2584
      pStart += len;
113,760✔
2585
      pStart1 += len;
113,760✔
2586
      totalLen += len;
113,760✔
2587
      totalLen += colLen;
113,760✔
2588
      (void)memcpy(pStart1, pStart, colLen);
113,760✔
2589
    } else {
2590
      len = BitmapLen(pResultInfo->numOfRows);
721,288✔
2591
      (void)memcpy(pStart1, pStart, len);
721,288✔
2592
      pStart += len;
721,288✔
2593
      pStart1 += len;
721,288✔
2594
      totalLen += len;
721,288✔
2595
      totalLen += colLen;
721,288✔
2596
      (void)memcpy(pStart1, pStart, colLen);
721,288✔
2597
    }
2598
    pStart += colLen;
1,293,968✔
2599
    pStart1 += colLen1;
1,293,968✔
2600
  }
2601

2602
  // Ensure the complete structure of the block, including the blankfill field,
2603
  // even though it is not used on the client side.
2604
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2605
  totalLen += sizeof(bool);
396,766✔
2606

2607
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
396,766✔
2608
  pResultInfo->pData = pResultInfo->convertJson;
396,766✔
2609
  return TSDB_CODE_SUCCESS;
396,766✔
2610
}
2611

2612
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
584,111,722✔
2613
  bool convertForDecimal = convertUcs4;
584,111,722✔
2614
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
584,111,722✔
2615
    tscError("setResultDataPtr paras error");
542✔
UNCOV
2616
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2617
  }
2618

2619
  if (pResultInfo->numOfRows == 0) {
584,113,280✔
2620
    return TSDB_CODE_SUCCESS;
46,154,664✔
2621
  }
2622

2623
  if (pResultInfo->pData == NULL) {
537,958,710✔
UNCOV
2624
    tscError("setResultDataPtr error: pData is NULL");
×
UNCOV
2625
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2626
  }
2627

2628
  int32_t code = doPrepareResPtr(pResultInfo);
537,956,756✔
2629
  if (code != TSDB_CODE_SUCCESS) {
537,958,380✔
2630
    return code;
×
2631
  }
2632
  code = doConvertJson(pResultInfo);
537,958,380✔
2633
  if (code != TSDB_CODE_SUCCESS) {
537,957,318✔
UNCOV
2634
    return code;
×
2635
  }
2636

2637
  char* p = (char*)pResultInfo->pData;
537,957,318✔
2638

2639
  // version:
2640
  int32_t blockVersion = *(int32_t*)p;
537,957,356✔
2641
  p += sizeof(int32_t);
537,957,752✔
2642

2643
  int32_t dataLen = *(int32_t*)p;
537,958,940✔
2644
  p += sizeof(int32_t);
537,958,940✔
2645

2646
  int32_t rows = *(int32_t*)p;
537,961,508✔
2647
  p += sizeof(int32_t);
537,960,716✔
2648

2649
  int32_t cols = *(int32_t*)p;
537,959,336✔
2650
  p += sizeof(int32_t);
537,961,492✔
2651

2652
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
537,959,300✔
2653
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
2,838✔
2654
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
UNCOV
2655
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2656
  }
2657

2658
  int32_t hasColumnSeg = *(int32_t*)p;
537,958,674✔
2659
  p += sizeof(int32_t);
537,957,130✔
2660

2661
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
537,960,282✔
2662
  p += sizeof(uint64_t);
537,960,282✔
2663

2664
  // check fields
2665
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2666
    int8_t type = *(int8_t*)p;
2,147,483,647✔
2667
    p += sizeof(int8_t);
2,147,483,647✔
2668

2669
    int32_t bytes = *(int32_t*)p;
2,147,483,647✔
2670
    p += sizeof(int32_t);
2,147,483,647✔
2671

2672
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
2,147,483,647✔
2673
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
70,524✔
2674
    }
2675
  }
2676

2677
  int32_t* colLength = (int32_t*)p;
537,959,484✔
2678
  p += sizeof(int32_t) * pResultInfo->numOfCols;
537,959,484✔
2679

2680
  char* pStart = p;
537,960,276✔
2681
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2682
    if ((pStart - pResultInfo->pData) >= dataLen) {
2,147,483,647✔
UNCOV
2683
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
UNCOV
2684
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2685
    }
2686
    if (blockVersion == BLOCK_VERSION_1) {
2,147,483,647✔
2687
      colLength[i] = htonl(colLength[i]);
2,095,235,244✔
2688
    }
2689
    if (colLength[i] >= dataLen) {
2,147,483,647✔
UNCOV
2690
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
UNCOV
2691
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2692
    }
2693
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
2,147,483,647✔
2694
      tscError("invalid type %d", pResultInfo->fields[i].type);
1,326✔
2695
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2696
    }
2697
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
2,147,483,647✔
2698
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
564,726,282✔
2699
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
564,712,710✔
2700
    } else {
2701
      pResultInfo->pCol[i].nullbitmap = pStart;
2,017,513,884✔
2702
      pStart += BitmapLen(pResultInfo->numOfRows);
2,017,515,878✔
2703
    }
2704

2705
    pResultInfo->pCol[i].pData = pStart;
2,147,483,647✔
2706
    pResultInfo->length[i] =
2,147,483,647✔
2707
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2708
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
2,147,483,647✔
2709

2710
    pStart += colLength[i];
2,147,483,647✔
2711
  }
2712

2713
  p = pStart;
537,961,406✔
2714
  // bool blankFill = *(bool*)p;
2715
  p += sizeof(bool);
537,961,406✔
2716
  int32_t offset = p - pResultInfo->pData;
537,962,198✔
2717
  if (offset > dataLen) {
537,961,618✔
UNCOV
2718
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
UNCOV
2719
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2720
  }
2721

2722
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2723
  if (convertUcs4) {
537,961,618✔
2724
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
537,776,544✔
2725
  }
2726
#endif
2727
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
537,962,598✔
2728
    code = convertDecimalType(pResultInfo);
537,777,792✔
2729
  }
2730
  return code;
537,957,812✔
2731
}
2732

2733
char* getDbOfConnection(STscObj* pObj) {
2,147,483,647✔
2734
  terrno = TSDB_CODE_SUCCESS;
2,147,483,647✔
2735
  char* p = NULL;
2,147,483,647✔
2736
  (void)taosThreadMutexLock(&pObj->mutex);
2,147,483,647✔
2737
  size_t len = strlen(pObj->db);
2,147,483,647✔
2738
  if (len > 0) {
2,147,483,647✔
2739
    p = taosStrndup(pObj->db, tListLen(pObj->db));
1,417,726,780✔
2740
    if (p == NULL) {
1,417,714,326✔
UNCOV
2741
      tscError("failed to taosStrndup db name");
×
2742
    }
2743
  }
2744

2745
  (void)taosThreadMutexUnlock(&pObj->mutex);
2,147,483,647✔
2746
  return p;
2,147,483,647✔
2747
}
2748

2749
void setConnectionDB(STscObj* pTscObj, const char* db) {
167,148,460✔
2750
  if (db == NULL || pTscObj == NULL) {
167,148,460✔
UNCOV
2751
    tscError("setConnectionDB para is NULL");
×
UNCOV
2752
    return;
×
2753
  }
2754

2755
  (void)taosThreadMutexLock(&pTscObj->mutex);
167,232,378✔
2756
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
167,272,844✔
2757
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
167,272,844✔
2758
}
2759

UNCOV
2760
void resetConnectDB(STscObj* pTscObj) {
×
UNCOV
2761
  if (pTscObj == NULL) {
×
UNCOV
2762
    return;
×
2763
  }
2764

2765
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2766
  pTscObj->db[0] = 0;
×
2767
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2768
}
2769

2770
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
498,478,126✔
2771
                              bool isStmt) {
2772
  if (pResultInfo == NULL || pRsp == NULL) {
498,478,126✔
UNCOV
2773
    tscError("setQueryResultFromRsp paras is null");
×
UNCOV
2774
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2775
  }
2776

2777
  taosMemoryFreeClear(pResultInfo->pRspMsg);
498,478,164✔
2778
  pResultInfo->pRspMsg = (const char*)pRsp;
498,478,180✔
2779
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
498,478,142✔
2780
  pResultInfo->current = 0;
498,478,164✔
2781
  pResultInfo->completed = (pRsp->completed == 1);
498,478,186✔
2782
  pResultInfo->precision = pRsp->precision;
498,478,116✔
2783

2784
  // decompress data if needed
2785
  int32_t payloadLen = htonl(pRsp->payloadLen);
498,478,104✔
2786

2787
  if (pRsp->compressed) {
498,478,078✔
UNCOV
2788
    if (pResultInfo->decompBuf == NULL) {
×
UNCOV
2789
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
UNCOV
2790
      if (pResultInfo->decompBuf == NULL) {
×
UNCOV
2791
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2792
        return terrno;
×
2793
      }
2794
      pResultInfo->decompBufSize = payloadLen;
×
2795
    } else {
2796
      if (pResultInfo->decompBufSize < payloadLen) {
×
2797
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
UNCOV
2798
        if (p == NULL) {
×
2799
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2800
          return terrno;
×
2801
        }
2802

2803
        pResultInfo->decompBuf = p;
×
2804
        pResultInfo->decompBufSize = payloadLen;
×
2805
      }
2806
    }
2807
  }
2808

2809
  if (payloadLen > 0) {
498,478,122✔
2810
    int32_t compLen = *(int32_t*)pRsp->data;
452,324,294✔
2811
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
452,323,310✔
2812

2813
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
452,324,294✔
2814

2815
    if (pRsp->compressed && compLen < rawLen) {
452,324,224✔
UNCOV
2816
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
UNCOV
2817
      if (len < 0) {
×
UNCOV
2818
        tscError("tsDecompressString failed");
×
UNCOV
2819
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2820
      }
2821
      if (len != rawLen) {
×
2822
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2823
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2824
      }
UNCOV
2825
      pResultInfo->pData = pResultInfo->decompBuf;
×
2826
      pResultInfo->payloadLen = rawLen;
×
2827
    } else {
2828
      pResultInfo->pData = pStart;
452,324,180✔
2829
      pResultInfo->payloadLen = htonl(pRsp->compLen);
452,324,288✔
2830
      if (pRsp->compLen != pRsp->payloadLen) {
452,323,466✔
2831
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
UNCOV
2832
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2833
      }
2834
    }
2835
  }
2836

2837
  // TODO handle the compressed case
2838
  pResultInfo->totalRows += pResultInfo->numOfRows;
498,478,116✔
2839

2840
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
498,478,050✔
2841
  return code;
498,477,070✔
2842
}
2843

2844
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
400✔
2845
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
400✔
2846
  void*              clientRpc = NULL;
400✔
2847
  SServerStatusRsp   statusRsp = {0};
400✔
2848
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
400✔
2849
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
400✔
2850
  SRpcMsg  rpcRsp = {0};
400✔
2851
  SRpcInit rpcInit = {0};
400✔
2852
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
400✔
2853

2854
  rpcInit.label = "CHK";
400✔
2855
  rpcInit.numOfThreads = 1;
400✔
2856
  rpcInit.cfp = NULL;
400✔
2857
  rpcInit.sessions = 16;
400✔
2858
  rpcInit.connType = TAOS_CONN_CLIENT;
400✔
2859
  rpcInit.idleTime = tsShellActivityTimer * 1000;
400✔
2860
  rpcInit.compressSize = tsCompressMsgSize;
400✔
2861
  rpcInit.user = "_dnd";
400✔
2862

2863
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
400✔
2864
  connLimitNum = TMAX(connLimitNum, 10);
400✔
2865
  connLimitNum = TMIN(connLimitNum, 500);
400✔
2866
  rpcInit.connLimitNum = connLimitNum;
400✔
2867
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
400✔
2868
  rpcInit.readTimeout = tsReadTimeout;
400✔
2869
  rpcInit.ipv6 = tsEnableIpv6;
400✔
2870
  rpcInit.enableSSL = tsEnableTLS;
400✔
2871

2872
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
400✔
2873
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
400✔
2874
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
400✔
2875
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
400✔
2876
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
400✔
2877

2878
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
400✔
UNCOV
2879
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
UNCOV
2880
    goto _OVER;
×
2881
  }
2882

2883
  clientRpc = rpcOpen(&rpcInit);
400✔
2884
  if (clientRpc == NULL) {
400✔
2885
    code = terrno;
×
UNCOV
2886
    tscError("failed to init server status client since %s", tstrerror(code));
×
UNCOV
2887
    goto _OVER;
×
2888
  }
2889

2890
  if (fqdn == NULL) {
400✔
2891
    fqdn = tsLocalFqdn;
400✔
2892
  }
2893

2894
  if (port == 0) {
400✔
2895
    port = tsServerPort;
400✔
2896
  }
2897

2898
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
400✔
2899
  epSet.eps[0].port = (uint16_t)port;
400✔
2900
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
400✔
2901
  if (TSDB_CODE_SUCCESS != ret) {
400✔
UNCOV
2902
    tscError("failed to send recv since %s", tstrerror(ret));
×
UNCOV
2903
    goto _OVER;
×
2904
  }
2905

2906
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
400✔
2907
    tscError("failed to send server status req since %s", terrstr());
94✔
2908
    goto _OVER;
94✔
2909
  }
2910

2911
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
306✔
UNCOV
2912
    tscError("failed to parse server status rsp since %s", terrstr());
×
UNCOV
2913
    goto _OVER;
×
2914
  }
2915

2916
  code = statusRsp.statusCode;
306✔
2917
  if (details != NULL) {
306✔
2918
    tstrncpy(details, statusRsp.details, maxlen);
306✔
2919
  }
2920

2921
_OVER:
372✔
2922
  if (clientRpc != NULL) {
400✔
2923
    rpcClose(clientRpc);
400✔
2924
  }
2925
  if (rpcRsp.pCont != NULL) {
400✔
2926
    rpcFreeCont(rpcRsp.pCont);
306✔
2927
  }
2928
  return code;
400✔
2929
}
2930

2931
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
2,400✔
2932
                      int32_t acctId, char* db) {
2933
  SName name = {0};
2,400✔
2934

2935
  if (len1 <= 0) {
2,400✔
UNCOV
2936
    return -1;
×
2937
  }
2938

2939
  const char* dbName = db;
2,400✔
2940
  const char* tbName = NULL;
2,400✔
2941
  int32_t     dbLen = 0;
2,400✔
2942
  int32_t     tbLen = 0;
2,400✔
2943
  if (len2 > 0) {
2,400✔
UNCOV
2944
    dbName = str + pos1;
×
UNCOV
2945
    dbLen = len1;
×
UNCOV
2946
    tbName = str + pos2;
×
UNCOV
2947
    tbLen = len2;
×
2948
  } else {
2949
    dbLen = strlen(db);
2,400✔
2950
    tbName = str + pos1;
2,400✔
2951
    tbLen = len1;
2,400✔
2952
  }
2953

2954
  if (dbLen <= 0 || tbLen <= 0) {
2,400✔
UNCOV
2955
    return -1;
×
2956
  }
2957

2958
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
2,400✔
UNCOV
2959
    return -1;
×
2960
  }
2961

2962
  if (tNameAddTbName(&name, tbName, tbLen)) {
2,400✔
UNCOV
2963
    return -1;
×
2964
  }
2965

2966
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
2,400✔
2967
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
2,400✔
2968

2969
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
2,400✔
2970
  if (pDb) {
2,400✔
UNCOV
2971
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
UNCOV
2972
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2973
    }
2974
  } else {
2975
    STablesReq db;
2,400✔
2976
    db.pTables = taosArrayInit(20, sizeof(SName));
2,400✔
2977
    if (NULL == db.pTables) {
2,400✔
UNCOV
2978
      return terrno;
×
2979
    }
2980
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
2,400✔
2981
    if (NULL == taosArrayPush(db.pTables, &name)) {
4,800✔
UNCOV
2982
      return terrno;
×
2983
    }
2984
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
2,400✔
2985
  }
2986

2987
  return TSDB_CODE_SUCCESS;
2,400✔
2988
}
2989

2990
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
2,400✔
2991
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
2,400✔
2992
  if (NULL == pHash) {
2,400✔
UNCOV
2993
    return terrno;
×
2994
  }
2995

2996
  bool    inEscape = false;
2,400✔
2997
  int32_t code = 0;
2,400✔
2998
  void*   pIter = NULL;
2,400✔
2999

3000
  int32_t vIdx = 0;
2,400✔
3001
  int32_t vPos[2];
2,400✔
3002
  int32_t vLen[2];
2,400✔
3003

3004
  (void)memset(vPos, -1, sizeof(vPos));
2,400✔
3005
  (void)memset(vLen, 0, sizeof(vLen));
2,400✔
3006

3007
  for (int32_t i = 0;; ++i) {
12,000✔
3008
    if (0 == *(tbList + i)) {
12,000✔
3009
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
2,400✔
3010
        vLen[vIdx] = i - vPos[vIdx];
2,400✔
3011
      }
3012

3013
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
2,400✔
3014
      if (code) {
2,400✔
UNCOV
3015
        goto _return;
×
3016
      }
3017

3018
      break;
2,400✔
3019
    }
3020

3021
    if ('`' == *(tbList + i)) {
9,600✔
UNCOV
3022
      inEscape = !inEscape;
×
UNCOV
3023
      if (!inEscape) {
×
UNCOV
3024
        if (vPos[vIdx] >= 0) {
×
UNCOV
3025
          vLen[vIdx] = i - vPos[vIdx];
×
3026
        } else {
3027
          goto _return;
×
3028
        }
3029
      }
3030

UNCOV
3031
      continue;
×
3032
    }
3033

3034
    if (inEscape) {
9,600✔
UNCOV
3035
      if (vPos[vIdx] < 0) {
×
3036
        vPos[vIdx] = i;
×
3037
      }
UNCOV
3038
      continue;
×
3039
    }
3040

3041
    if ('.' == *(tbList + i)) {
9,600✔
UNCOV
3042
      if (vPos[vIdx] < 0) {
×
3043
        goto _return;
×
3044
      }
UNCOV
3045
      if (vLen[vIdx] <= 0) {
×
UNCOV
3046
        vLen[vIdx] = i - vPos[vIdx];
×
3047
      }
3048
      vIdx++;
×
UNCOV
3049
      if (vIdx >= 2) {
×
3050
        goto _return;
×
3051
      }
UNCOV
3052
      continue;
×
3053
    }
3054

3055
    if (',' == *(tbList + i)) {
9,600✔
UNCOV
3056
      if (vPos[vIdx] < 0) {
×
3057
        goto _return;
×
3058
      }
UNCOV
3059
      if (vLen[vIdx] <= 0) {
×
UNCOV
3060
        vLen[vIdx] = i - vPos[vIdx];
×
3061
      }
3062

UNCOV
3063
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3064
      if (code) {
×
3065
        goto _return;
×
3066
      }
3067

3068
      (void)memset(vPos, -1, sizeof(vPos));
×
3069
      (void)memset(vLen, 0, sizeof(vLen));
×
3070
      vIdx = 0;
×
UNCOV
3071
      continue;
×
3072
    }
3073

3074
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
9,600✔
3075
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3076
        vLen[vIdx] = i - vPos[vIdx];
×
3077
      }
UNCOV
3078
      continue;
×
3079
    }
3080

3081
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
9,600✔
3082
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
1,200✔
3083
      if (vLen[vIdx] > 0) {
9,600✔
UNCOV
3084
        goto _return;
×
3085
      }
3086
      if (vPos[vIdx] < 0) {
9,600✔
3087
        vPos[vIdx] = i;
2,400✔
3088
      }
3089
      continue;
9,600✔
3090
    }
3091

UNCOV
3092
    goto _return;
×
3093
  }
3094

3095
  int32_t dbNum = taosHashGetSize(pHash);
2,400✔
3096
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
2,400✔
3097
  if (NULL == pReq) {
2,400✔
UNCOV
3098
    TSC_ERR_JRET(terrno);
×
3099
  }
3100
  pIter = taosHashIterate(pHash, NULL);
2,400✔
3101
  while (pIter) {
4,800✔
3102
    STablesReq* pDb = (STablesReq*)pIter;
2,400✔
3103
    if (NULL == taosArrayPush(*pReq, pDb)) {
4,800✔
UNCOV
3104
      TSC_ERR_JRET(terrno);
×
3105
    }
3106
    pIter = taosHashIterate(pHash, pIter);
2,400✔
3107
  }
3108

3109
  taosHashCleanup(pHash);
2,400✔
3110

3111
  return TSDB_CODE_SUCCESS;
2,400✔
3112

UNCOV
3113
_return:
×
3114

UNCOV
3115
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3116

UNCOV
3117
  pIter = taosHashIterate(pHash, NULL);
×
3118
  while (pIter) {
×
UNCOV
3119
    STablesReq* pDb = (STablesReq*)pIter;
×
3120
    taosArrayDestroy(pDb->pTables);
×
UNCOV
3121
    pIter = taosHashIterate(pHash, pIter);
×
3122
  }
3123

3124
  taosHashCleanup(pHash);
×
3125

3126
  return terrno;
×
3127
}
3128

3129
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
2,400✔
3130
  SSyncQueryParam* pParam = param;
2,400✔
3131
  pParam->pRequest->code = code;
2,400✔
3132

3133
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
2,400✔
UNCOV
3134
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3135
  }
3136
}
2,400✔
3137

3138
void syncQueryFn(void* param, void* res, int32_t code) {
1,837,810,706✔
3139
  SSyncQueryParam* pParam = param;
1,837,810,706✔
3140
  pParam->pRequest = res;
1,837,810,706✔
3141

3142
  if (pParam->pRequest) {
1,837,817,548✔
3143
    pParam->pRequest->code = code;
1,837,818,978✔
3144
    clientOperateReport(pParam->pRequest);
1,837,833,134✔
3145
  }
3146

3147
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,837,774,770✔
UNCOV
3148
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3149
  }
3150
}
1,837,901,398✔
3151

3152
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
1,837,138,844✔
3153
                        int8_t source) {
3154
  if (sql == NULL || NULL == fp) {
1,837,138,844✔
3155
    terrno = TSDB_CODE_INVALID_PARA;
15,586✔
UNCOV
3156
    if (fp) {
×
UNCOV
3157
      fp(param, NULL, terrno);
×
3158
    }
3159

3160
    return;
380✔
3161
  }
3162

3163
  size_t sqlLen = strlen(sql);
1,837,136,556✔
3164
  if (sqlLen > (size_t)tsMaxSQLLength) {
1,837,136,556✔
3165
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
1,024✔
3166
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1,024✔
3167
    fp(param, NULL, terrno);
1,024✔
3168
    return;
1,024✔
3169
  }
3170

3171
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
1,837,135,532✔
3172

3173
  SRequestObj* pRequest = NULL;
1,837,135,532✔
3174
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
1,837,150,492✔
3175
  if (code != TSDB_CODE_SUCCESS) {
1,837,150,044✔
UNCOV
3176
    terrno = code;
×
UNCOV
3177
    fp(param, NULL, terrno);
×
UNCOV
3178
    return;
×
3179
  }
3180

3181
  code = connCheckAndUpateMetric(connId);
1,837,150,044✔
3182
  if (code != TSDB_CODE_SUCCESS) {
1,837,136,398✔
3183
    terrno = code;
948✔
3184
    fp(param, NULL, terrno);
948✔
3185
    return;
948✔
3186
  }
3187

3188
  pRequest->source = source;
1,837,135,450✔
3189
  pRequest->body.queryFp = fp;
1,837,133,402✔
3190
  doAsyncQuery(pRequest, false);
1,837,131,332✔
3191
}
3192

3193
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
462✔
3194
                                 int64_t reqid) {
3195
  if (sql == NULL || NULL == fp) {
462✔
UNCOV
3196
    terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
3197
    if (fp) {
×
UNCOV
3198
      fp(param, NULL, terrno);
×
3199
    }
3200

3201
    return;
×
3202
  }
3203

3204
  size_t sqlLen = strlen(sql);
462✔
3205
  if (sqlLen > (size_t)tsMaxSQLLength) {
462✔
3206
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
UNCOV
3207
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
UNCOV
3208
    fp(param, NULL, terrno);
×
UNCOV
3209
    return;
×
3210
  }
3211

3212
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
462✔
3213

3214
  SRequestObj* pRequest = NULL;
462✔
3215
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
462✔
3216
  if (code != TSDB_CODE_SUCCESS) {
462✔
UNCOV
3217
    terrno = code;
×
UNCOV
3218
    fp(param, NULL, terrno);
×
UNCOV
3219
    return;
×
3220
  }
3221

3222
  code = connCheckAndUpateMetric(connId);
462✔
3223

3224
  if (code != TSDB_CODE_SUCCESS) {
462✔
UNCOV
3225
    terrno = code;
×
UNCOV
3226
    fp(param, NULL, terrno);
×
UNCOV
3227
    return;
×
3228
  }
3229

3230
  pRequest->body.queryFp = fp;
462✔
3231

3232
  doAsyncQuery(pRequest, false);
462✔
3233
}
3234

3235
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
1,836,873,676✔
3236
  if (NULL == taos) {
1,836,873,676✔
UNCOV
3237
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3238
    return NULL;
×
3239
  }
3240

3241
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
1,836,873,676✔
3242
  if (NULL == param) {
1,836,889,240✔
3243
    return NULL;
×
3244
  }
3245

3246
  int32_t code = tsem_init(&param->sem, 0, 0);
1,836,889,240✔
3247
  if (TSDB_CODE_SUCCESS != code) {
1,836,878,732✔
3248
    taosMemoryFree(param);
×
UNCOV
3249
    return NULL;
×
3250
  }
3251

3252
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
1,836,878,732✔
3253
  code = tsem_wait(&param->sem);
1,836,815,402✔
3254
  if (TSDB_CODE_SUCCESS != code) {
1,836,867,420✔
UNCOV
3255
    taosMemoryFree(param);
×
UNCOV
3256
    return NULL;
×
3257
  }
3258
  code = tsem_destroy(&param->sem);
1,836,867,420✔
3259
  if (TSDB_CODE_SUCCESS != code) {
1,836,843,922✔
3260
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3261
  }
3262

3263
  SRequestObj* pRequest = NULL;
1,836,861,222✔
3264
  if (param->pRequest != NULL) {
1,836,861,222✔
3265
    param->pRequest->syncQuery = true;
1,836,858,870✔
3266
    pRequest = param->pRequest;
1,836,851,136✔
3267
    param->pRequest->inCallback = false;
1,836,858,980✔
3268
  }
3269
  taosMemoryFree(param);
1,836,857,146✔
3270

3271
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3272
  //          *(int64_t*)taos, pRequest);
3273

3274
  return pRequest;
1,836,879,066✔
3275
}
3276

3277
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
462✔
3278
  if (NULL == taos) {
462✔
UNCOV
3279
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3280
    return NULL;
×
3281
  }
3282

3283
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
462✔
3284
  if (param == NULL) {
462✔
3285
    return NULL;
×
3286
  }
3287
  int32_t code = tsem_init(&param->sem, 0, 0);
462✔
3288
  if (TSDB_CODE_SUCCESS != code) {
462✔
UNCOV
3289
    taosMemoryFree(param);
×
3290
    return NULL;
×
3291
  }
3292

3293
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
462✔
3294
  code = tsem_wait(&param->sem);
462✔
3295
  if (TSDB_CODE_SUCCESS != code) {
462✔
UNCOV
3296
    taosMemoryFree(param);
×
UNCOV
3297
    return NULL;
×
3298
  }
3299
  SRequestObj* pRequest = NULL;
462✔
3300
  if (param->pRequest != NULL) {
462✔
3301
    param->pRequest->syncQuery = true;
462✔
3302
    pRequest = param->pRequest;
462✔
3303
  }
3304
  taosMemoryFree(param);
462✔
3305

3306
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3307
  //   *(int64_t*)taos, pRequest);
3308

3309
  return pRequest;
462✔
3310
}
3311

3312
static void fetchCallback(void* pResult, void* param, int32_t code) {
492,020,764✔
3313
  SRequestObj* pRequest = (SRequestObj*)param;
492,020,764✔
3314

3315
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
492,020,764✔
3316

3317
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
492,020,786✔
3318
           tstrerror(code), pRequest->requestId);
3319

3320
  pResultInfo->pData = pResult;
492,020,786✔
3321
  pResultInfo->numOfRows = 0;
492,020,744✔
3322

3323
  if (code != TSDB_CODE_SUCCESS) {
492,020,662✔
UNCOV
3324
    pRequest->code = code;
×
UNCOV
3325
    taosMemoryFreeClear(pResultInfo->pData);
×
UNCOV
3326
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
UNCOV
3327
    return;
×
3328
  }
3329

3330
  if (pRequest->code != TSDB_CODE_SUCCESS) {
492,020,662✔
3331
    taosMemoryFreeClear(pResultInfo->pData);
×
3332
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
UNCOV
3333
    return;
×
3334
  }
3335

3336
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
501,235,122✔
3337
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
492,020,692✔
3338
  if (pRequest->code != TSDB_CODE_SUCCESS) {
492,019,670✔
3339
    pResultInfo->numOfRows = 0;
128✔
3340
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
128✔
3341
             tstrerror(pRequest->code), pRequest->requestId);
3342
  } else {
3343
    tscDebug(
492,017,786✔
3344
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3345
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3346

3347
    STscObj*            pTscObj = pRequest->pTscObj;
492,018,770✔
3348
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
492,020,554✔
3349
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
492,020,658✔
3350
  }
3351

3352
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
492,020,594✔
3353
}
3354

3355
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
554,515,114✔
3356
  pRequest->body.fetchFp = fp;
554,515,114✔
3357
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
554,515,130✔
3358

3359
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
554,515,136✔
3360

3361
  // this query has no results or error exists, return directly
3362
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
554,515,136✔
3363
    pResultInfo->numOfRows = 0;
22✔
UNCOV
3364
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3365
    return;
3,883,676✔
3366
  }
3367

3368
  // all data has returned to App already, no need to try again
3369
  if (pResultInfo->completed) {
554,515,114✔
3370
    // it is a local executed query, no need to do async fetch
3371
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
62,494,350✔
3372
      if (pResultInfo->localResultFetched) {
2,796,240✔
3373
        pResultInfo->numOfRows = 0;
1,398,120✔
3374
        pResultInfo->current = 0;
1,398,120✔
3375
      } else {
3376
        pResultInfo->localResultFetched = true;
1,398,120✔
3377
      }
3378
    } else {
3379
      pResultInfo->numOfRows = 0;
59,698,110✔
3380
    }
3381

3382
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
62,494,350✔
3383
    return;
62,494,350✔
3384
  }
3385

3386
  SSchedulerReq req = {
492,020,764✔
3387
      .syncReq = false,
3388
      .fetchFp = fetchCallback,
3389
      .cbParam = pRequest,
3390
  };
3391

3392
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
492,020,764✔
3393
  if (TSDB_CODE_SUCCESS != code) {
492,020,786✔
UNCOV
3394
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3395
    // pRequest->body.fetchFp(param, pRequest, code);
3396
  }
3397
}
3398

3399
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
1,837,821,894✔
3400
  pRequest->inCallback = true;
1,837,821,894✔
3401
  int64_t this = pRequest->self;
1,837,840,712✔
3402
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
1,837,801,620✔
3403
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
40,256✔
UNCOV
3404
    code = TSDB_CODE_SUCCESS;
×
UNCOV
3405
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3406
  }
3407

3408
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
1,837,801,620✔
3409
           pRequest);
3410

3411
  if (pRequest->body.queryFp != NULL) {
1,837,804,964✔
3412
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
1,837,840,992✔
3413
  }
3414

3415
  SRequestObj* pReq = acquireRequest(this);
1,837,940,190✔
3416
  if (pReq != NULL) {
1,837,967,414✔
3417
    pReq->inCallback = false;
1,834,481,010✔
3418
    (void)releaseRequest(this);
1,834,482,420✔
3419
  }
3420
}
1,837,929,204✔
3421

3422
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
1,020,170✔
3423
                       SParseSqlRes* pRes) {
3424
#ifndef TD_ENTERPRISE
3425
  return TSDB_CODE_SUCCESS;
3426
#else
3427
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
1,020,170✔
3428
#endif
3429
}
3430

3431
void updateConnAccessInfo(SConnAccessInfo* pInfo) {
2,005,075,944✔
3432
  if (pInfo == NULL) {
2,005,075,944✔
UNCOV
3433
    return;
×
3434
  }
3435
  int64_t ts = taosGetTimestampMs();
2,005,121,766✔
3436
  if (pInfo->startTime == 0) {
2,005,121,766✔
3437
    pInfo->startTime = ts;
167,980,990✔
3438
  }
3439
  pInfo->lastAccessTime = ts;
2,005,121,932✔
3440
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc