• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4879

11 Dec 2025 02:43AM UTC coverage: 64.544% (-0.03%) from 64.569%
#4879

push

travis-ci

guanshengliang
feat(TS-7270): internal dependence

307 of 617 new or added lines in 24 files covered. (49.76%)

3883 existing lines in 125 files now uncovered.

163565 of 253417 relevant lines covered (64.54%)

105600506.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.19
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "decimal.h"
22
#include "scheduler.h"
23
#include "tdatablock.h"
24
#include "tdataformat.h"
25
#include "tdef.h"
26
#include "tglobal.h"
27
#include "tmisce.h"
28
#include "tmsg.h"
29
#include "tmsgtype.h"
30
#include "tpagedbuf.h"
31
#include "tref.h"
32
#include "tsched.h"
33
#include "tversion.h"
34

35
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
36
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
37

38
void setQueryRequest(int64_t rId) {
123,928,286✔
39
  SRequestObj* pReq = acquireRequest(rId);
123,928,286✔
40
  if (pReq != NULL) {
123,930,496✔
41
    pReq->isQuery = true;
123,919,940✔
42
    (void)releaseRequest(rId);
123,919,940✔
43
  }
44
}
123,930,238✔
45

46
static bool stringLengthCheck(const char* str, size_t maxsize) {
8,429,046✔
47
  if (str == NULL) {
8,429,046✔
48
    return false;
×
49
  }
50

51
  size_t len = strlen(str);
8,429,046✔
52
  if (len <= 0 || len > maxsize) {
8,429,046✔
53
    return false;
×
54
  }
55

56
  return true;
8,430,152✔
57
}
58

59
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
3,409,869✔
60

61
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
3,408,443✔
62

63
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,611,336✔
64

65
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
3,405,551✔
66
  char key[512] = {0};
3,405,551✔
67
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
3,406,470✔
68
  return taosStrdup(key);
3,406,470✔
69
}
70

71
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
662,287✔
72
  if (dst == NULL || src == NULL || srcLength == 0) {
662,287✔
73
    return 0;
543✔
74
  }
75
  
76
  size_t escapeLength = 0;
661,744✔
77
  for(size_t i = 0; i < srcLength; ++i) {
18,807,434✔
78
    if( src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' ||
18,145,690✔
79
        src[i] == '\r' || src[i] == '\t') {
18,145,690✔
80
      escapeLength += 1; 
×
81
    }    
82
  }
83

84
  size_t dstLength = srcLength;
661,744✔
85
  if(escapeLength == 0) {
661,744✔
86
     (void)memcpy(dst, src, srcLength);
661,744✔
87
  } else {
88
    dstLength = 0;
×
89
    for(size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
90
      switch(src[i]) {
×
91
        case '\"':
×
92
          dst[dstLength++] = '\\';
×
93
          dst[dstLength++] = '\"';
×
94
          break;
×
95
        case '\\':
×
96
          dst[dstLength++] = '\\';
×
97
          dst[dstLength++] = '\\';
×
98
          break;
×
99
        case '\b':
×
100
          dst[dstLength++] = '\\';
×
101
          dst[dstLength++] = 'b';
×
102
          break;
×
103
        case '\f':
×
104
          dst[dstLength++] = '\\';
×
105
          dst[dstLength++] = 'f';
×
106
          break;
×
107
        case '\n':
×
108
          dst[dstLength++] = '\\';
×
109
          dst[dstLength++] = 'n';
×
110
          break;
×
111
        case '\r':
×
112
          dst[dstLength++] = '\\';
×
113
          dst[dstLength++] = 'r';
×
114
          break;
×
115
        case '\t':
×
116
          dst[dstLength++] = '\\';
×
117
          dst[dstLength++] = 't';
×
118
          break;
×
119
        default:
×
120
           dst[dstLength++] = src[i];
×
121
      }
122
    }
123
  }
124

125
  return dstLength;
661,744✔
126
}
127

128
bool chkRequestKilled(void* param) {
2,147,483,647✔
129
  bool         killed = false;
2,147,483,647✔
130
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
131
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
132
    killed = true;
×
133
  }
134

135
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
136

137
  return killed;
2,147,483,647✔
138
}
139

140
void cleanupAppInfo() {
1,340,412✔
141
  taosHashCleanup(appInfo.pInstMap);
1,340,412✔
142
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,340,412✔
143
  tscInfo("cluster instance map cleaned");
1,340,412✔
144
}
1,340,412✔
145

146
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp, void* param,
147
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
148

149
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* totp, const char* db,
3,409,942✔
150
                              uint16_t port, int connType, STscObj** pObj) {
151
  TSC_ERR_RET(taos_init());
3,409,942✔
152
  if (!validateUserName(user)) {
3,409,869✔
153
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
154
  }
155
  int32_t code = 0;
3,409,952✔
156

157
  char localDb[TSDB_DB_NAME_LEN] = {0};
3,409,952✔
158
  if (db != NULL && strlen(db) > 0) {
3,409,952✔
159
    if (!validateDbName(db)) {
1,611,336✔
160
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
161
    }
162

163
    tstrncpy(localDb, db, sizeof(localDb));
1,611,336✔
164
    (void)strdequote(localDb);
1,611,336✔
165
  }
166

167
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
3,409,734✔
168
  if (auth == NULL) {
3,410,068✔
169
    if (!validatePassword(pass)) {
3,409,234✔
170
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
171
    }
172

173
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
3,409,234✔
174
  } else {
175
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
834✔
176
  }
177

178
  int32_t totpCode = -1;
3,408,449✔
179
  if (totp != NULL) {
3,408,449✔
NEW
180
    char *endptr = NULL;
×
NEW
181
    totpCode = taosStr2Int32(totp, &endptr, 10);
×
NEW
182
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
×
NEW
183
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
184
    }
185
  }
186

187
  SCorEpSet epSet = {0};
3,408,449✔
188
  if (ip) {
3,407,569✔
189
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
1,192,773✔
190
  } else {
191
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
2,214,796✔
192
  }
193

194
  if (port) {
3,405,714✔
195
    epSet.epSet.eps[0].port = port;
149,972✔
196
    epSet.epSet.eps[1].port = port;
149,972✔
197
  }
198

199
  char* key = getClusterKey(user, secretEncrypt, ip, port);
3,405,714✔
200
  if (NULL == key) {
3,405,813✔
UNCOV
201
    TSC_ERR_RET(terrno);
×
202
  }
203
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
3,405,813✔
204
          user, db, key);
205
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
9,035,866✔
206
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
5,626,418✔
207
  }
208
  // for (int32_t i = 0; i < epSet.epSet.numOfEps; i++) {
209
  //   if ((code = taosValidFqdn(tsEnableIpv6, epSet.epSet.eps[i].fqdn)) != 0) {
210
  //     taosMemFree(key);
211
  //     tscError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6,
212
  //              epSet.epSet.eps[i].fqdn, tstrerror(code));
213
  //     TSC_ERR_RET(code);
214
  //   }
215
  // }
216

217
  SAppInstInfo** pInst = NULL;
3,409,448✔
218
  code = taosThreadMutexLock(&appInfo.mutex);
3,409,448✔
219
  if (TSDB_CODE_SUCCESS != code) {
3,409,448✔
UNCOV
220
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
UNCOV
221
    TSC_ERR_RET(code);
×
222
  }
223

224
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
3,409,448✔
225
  SAppInstInfo* p = NULL;
3,409,448✔
226
  if (pInst == NULL) {
3,409,448✔
227
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,409,529✔
228
    if (NULL == p) {
1,409,529✔
229
      TSC_ERR_JRET(terrno);
×
230
    }
231
    p->mgmtEp = epSet;
1,409,529✔
232
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,409,529✔
233
    if (TSDB_CODE_SUCCESS != code) {
1,409,529✔
UNCOV
234
      taosMemoryFree(p);
×
UNCOV
235
      TSC_ERR_JRET(code);
×
236
    }
237
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
1,409,529✔
238
    if (TSDB_CODE_SUCCESS != code) {
1,409,529✔
239
      taosMemoryFree(p);
56✔
240
      TSC_ERR_JRET(code);
56✔
241
    }
242
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,409,473✔
243
    if (TSDB_CODE_SUCCESS != code) {
1,409,473✔
244
      destroyAppInst(&p);
×
UNCOV
245
      TSC_ERR_JRET(code);
×
246
    }
247
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,409,473✔
248
    if (TSDB_CODE_SUCCESS != code) {
1,409,473✔
UNCOV
249
      destroyAppInst(&p);
×
UNCOV
250
      TSC_ERR_JRET(code);
×
251
    }
252
    p->instKey = key;
1,409,473✔
253
    key = NULL;
1,409,473✔
254
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
1,409,473✔
255

256
    pInst = &p;
1,409,473✔
257
  } else {
258
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
1,999,919✔
259
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
UNCOV
260
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
261
    }
262
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
263
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
1,999,919✔
264
  }
265

266
_return:
3,409,448✔
267

268
  if (TSDB_CODE_SUCCESS != code) {
3,409,448✔
269
    (void)taosThreadMutexUnlock(&appInfo.mutex);
56✔
270
    taosMemoryFreeClear(key);
56✔
271
    return code;
56✔
272
  } else {
273
    code = taosThreadMutexUnlock(&appInfo.mutex);
3,409,392✔
274
    taosMemoryFreeClear(key);
3,409,392✔
275
    if (TSDB_CODE_SUCCESS != code) {
3,409,392✔
UNCOV
276
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
UNCOV
277
      return code;
×
278
    }
279
    return taosConnectImpl(user, &secretEncrypt[0], totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
3,409,392✔
280
  }
281
}
282

283
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
284
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
285
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
286
//     return *ppAppInstInfo;
287
//   } else {
288
//     return NULL;
289
//   }
290
// }
291

292
void freeQueryParam(SSyncQueryParam* param) {
586,303✔
293
  if (param == NULL) return;
586,303✔
294
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
586,303✔
UNCOV
295
    tscError("failed to destroy semaphore in freeQueryParam");
×
296
  }
297
  taosMemoryFree(param);
586,303✔
298
}
299

300
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
653,732,749✔
301
                     SRequestObj** pRequest, int64_t reqid) {
302
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
653,732,749✔
303
  if (TSDB_CODE_SUCCESS != code) {
653,736,965✔
304
    tscError("failed to malloc sqlObj, %s", sql);
×
UNCOV
305
    return code;
×
306
  }
307

308
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
653,736,965✔
309
  if ((*pRequest)->sqlstr == NULL) {
653,733,166✔
UNCOV
310
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
UNCOV
311
    destroyRequest(*pRequest);
×
UNCOV
312
    *pRequest = NULL;
×
313
    return terrno;
×
314
  }
315

316
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
653,731,072✔
317
  (*pRequest)->sqlstr[sqlLen] = 0;
653,741,741✔
318
  (*pRequest)->sqlLen = sqlLen;
653,742,431✔
319
  (*pRequest)->validateOnly = validateSql;
653,744,154✔
320
  (*pRequest)->stmtBindVersion = 0;
653,742,376✔
321

322
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
653,740,178✔
323

324
  STscObj* pTscObj = (*pRequest)->pTscObj;
653,740,870✔
325
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
653,740,711✔
326
                             sizeof((*pRequest)->self));
327
  if (err) {
653,737,848✔
UNCOV
328
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
329
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
UNCOV
330
    destroyRequest(*pRequest);
×
UNCOV
331
    *pRequest = NULL;
×
UNCOV
332
    return terrno;
×
333
  }
334

335
  (*pRequest)->allocatorRefId = -1;
653,737,848✔
336
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
653,743,103✔
337
    if (TSDB_CODE_SUCCESS !=
186,682,359✔
338
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
186,674,218✔
339
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
340
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
341
      destroyRequest(*pRequest);
×
UNCOV
342
      *pRequest = NULL;
×
UNCOV
343
      return terrno;
×
344
    }
345
  }
346

347
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
653,750,332✔
348
  return TSDB_CODE_SUCCESS;
653,738,056✔
349
}
350

351
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
352
  int32_t code =
UNCOV
353
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
UNCOV
354
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
355
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
UNCOV
356
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
UNCOV
357
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
UNCOV
358
    (*pNewRequest)->isSubReq = true;
×
359
  }
360
  return code;
×
361
}
362

363
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
1,082,493✔
364
  STscObj* pTscObj = pRequest->pTscObj;
1,082,493✔
365

366
  SParseContext cxt = {
1,083,179✔
367
      .requestId = pRequest->requestId,
1,082,601✔
368
      .requestRid = pRequest->self,
1,083,328✔
369
      .acctId = pTscObj->acctId,
1,082,860✔
370
      .db = pRequest->pDb,
1,081,816✔
371
      .topicQuery = topicQuery,
372
      .pSql = pRequest->sqlstr,
1,082,352✔
373
      .sqlLen = pRequest->sqlLen,
1,082,687✔
374
      .pMsg = pRequest->msgBuf,
1,082,018✔
375
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
376
      .pTransporter = pTscObj->pAppInfo->pTransporter,
1,082,119✔
377
      .pStmtCb = pStmtCb,
378
      .pUser = pTscObj->user,
1,083,104✔
379
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
1,082,047✔
380
      .enableSysInfo = pTscObj->sysInfo,
1,081,536✔
381
      .svrVer = pTscObj->sVer,
1,081,734✔
382
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
1,082,769✔
383
      .stmtBindVersion = pRequest->stmtBindVersion,
1,082,099✔
384
      .setQueryFp = setQueryRequest,
385
      .timezone = pTscObj->optionInfo.timezone,
1,082,242✔
386
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
1,082,085✔
387
  };
388

389
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,081,510✔
390
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
1,083,548✔
391
  if (code != TSDB_CODE_SUCCESS) {
1,082,697✔
UNCOV
392
    return code;
×
393
  }
394

395
  code = qParseSql(&cxt, pQuery);
1,082,697✔
396
  if (TSDB_CODE_SUCCESS == code) {
1,082,404✔
397
    if ((*pQuery)->haveResultSet) {
1,081,137✔
UNCOV
398
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
UNCOV
399
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
UNCOV
400
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
401
    }
402
  }
403

404
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
1,082,349✔
405
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
1,080,397✔
406
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
1,081,167✔
407
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
1,081,112✔
408
  }
409

410
  taosArrayDestroy(cxt.pTableMetaPos);
1,082,844✔
411
  taosArrayDestroy(cxt.pTableVgroupPos);
1,082,814✔
412

413
  return code;
1,083,180✔
414
}
415

UNCOV
416
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
UNCOV
417
  SRetrieveTableRsp* pRsp = NULL;
×
UNCOV
418
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
UNCOV
419
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
UNCOV
420
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
UNCOV
421
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
UNCOV
422
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
UNCOV
423
                                 pRequest->stmtBindVersion > 0);
×
424
  }
425

426
  return code;
×
427
}
428

429
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
406,540✔
430
  // drop table if exists not_exists_table
431
  if (NULL == pQuery->pCmdMsg) {
406,540✔
432
    return TSDB_CODE_SUCCESS;
×
433
  }
434

435
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
406,540✔
436
  pRequest->type = pMsgInfo->msgType;
406,540✔
437
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
406,540✔
438
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
406,540✔
439

440
  STscObj*      pTscObj = pRequest->pTscObj;
406,540✔
441
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
406,540✔
442

443
  // int64_t transporterId = 0;
444
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
406,435✔
445
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
406,540✔
446
  return TSDB_CODE_SUCCESS;
406,643✔
447
}
448

449
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
1,232,057,413✔
450

451
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
5,666,117✔
452
  SRetrieveTableRsp* pRsp = NULL;
5,666,117✔
453
  if (pRequest->validateOnly) {
5,666,117✔
454
    doRequestCallback(pRequest, 0);
12,717✔
455
    return;
12,717✔
456
  }
457

458
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
11,209,678✔
459
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
11,209,678✔
460
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
5,653,400✔
461
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
3,032,630✔
462
                                 pRequest->stmtBindVersion > 0);
3,032,630✔
463
  }
464

465
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
5,652,353✔
466
  pRequest->code = code;
5,652,353✔
467

468
  if (pRequest->code != TSDB_CODE_SUCCESS) {
5,652,353✔
469
    pResultInfo->numOfRows = 0;
3,530✔
470
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
3,530✔
471
             pRequest->requestId);
472
  } else {
473
    tscDebug(
5,649,079✔
474
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
475
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
476
  }
477

478
  doRequestCallback(pRequest, code);
5,652,609✔
479
}
480

481
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
18,282,758✔
482
  if (pRequest->validateOnly) {
18,282,758✔
UNCOV
483
    doRequestCallback(pRequest, 0);
×
UNCOV
484
    return TSDB_CODE_SUCCESS;
×
485
  }
486

487
  // drop table if exists not_exists_table
488
  if (NULL == pQuery->pCmdMsg) {
18,282,758✔
489
    doRequestCallback(pRequest, 0);
7,856✔
490
    return TSDB_CODE_SUCCESS;
7,856✔
491
  }
492

493
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
18,274,296✔
494
  pRequest->type = pMsgInfo->msgType;
18,274,902✔
495
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
18,274,902✔
496
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
18,274,429✔
497

498
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
18,274,902✔
499
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
18,273,663✔
500

501
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
18,274,296✔
502
  if (code) {
18,275,056✔
UNCOV
503
    doRequestCallback(pRequest, code);
×
504
  }
505
  return code;
18,275,056✔
506
}
507

508
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
348,915✔
509
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
348,915✔
510
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
348,915✔
511

512
  if (node1->load < node2->load) {
348,915✔
UNCOV
513
    return -1;
×
514
  }
515

516
  return node1->load > node2->load;
348,915✔
517
}
518

519
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
110,861✔
520
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
110,861✔
521
  if (pInfo->pQnodeList) {
110,861✔
522
    taosArrayDestroy(pInfo->pQnodeList);
104,679✔
523
    pInfo->pQnodeList = NULL;
104,679✔
524
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
104,679✔
525
  }
526

527
  if (pNodeList) {
110,861✔
528
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
110,861✔
529
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
110,861✔
530
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
110,861✔
531
             taosArrayGetSize(pInfo->pQnodeList));
532
  }
533
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
110,861✔
534

535
  return TSDB_CODE_SUCCESS;
110,861✔
536
}
537

538
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
653,189,198✔
539
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
653,189,198✔
540
    *required = false;
652,356,111✔
541
    return TSDB_CODE_SUCCESS;
652,355,407✔
542
  }
543

544
  int32_t       code = TSDB_CODE_SUCCESS;
833,087✔
545
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
833,087✔
546
  *required = false;
833,087✔
547

548
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
833,087✔
549
  *required = (NULL == pInfo->pQnodeList);
833,087✔
550
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
833,087✔
551
  return TSDB_CODE_SUCCESS;
833,087✔
552
}
553

UNCOV
554
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
UNCOV
555
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
UNCOV
556
  int32_t       code = 0;
×
557

UNCOV
558
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
UNCOV
559
  if (pInfo->pQnodeList) {
×
UNCOV
560
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
561
  }
UNCOV
562
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
563
  if (NULL == *pNodeList) {
×
564
    SCatalog* pCatalog = NULL;
×
565
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
UNCOV
566
    if (TSDB_CODE_SUCCESS == code) {
×
567
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
568
      if (NULL == pNodeList) {
×
569
        TSC_ERR_RET(terrno);
×
570
      }
571
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
572
                               .requestId = pRequest->requestId,
×
573
                               .requestObjRefId = pRequest->self,
×
574
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
575
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
576
    }
577

578
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
UNCOV
579
      code = updateQnodeList(pInfo, *pNodeList);
×
580
    }
581
  }
582

583
  return code;
×
584
}
585

586
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
12,828,974✔
587
  pRequest->type = pQuery->msgType;
12,828,974✔
588
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
12,829,789✔
589

590
  SPlanContext cxt = {.queryId = pRequest->requestId,
19,481,785✔
591
                      .acctId = pRequest->pTscObj->acctId,
12,829,727✔
592
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
12,830,076✔
593
                      .pAstRoot = pQuery->pRoot,
12,831,280✔
594
                      .showRewrite = pQuery->showRewrite,
12,831,611✔
595
                      .pMsg = pRequest->msgBuf,
12,831,556✔
596
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
597
                      .pUser = pRequest->pTscObj->user,
12,831,280✔
598
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
12,830,453✔
599
                      .sysInfo = pRequest->pTscObj->sysInfo};
12,831,556✔
600

601
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
12,830,728✔
602
}
603

604
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
130,110,479✔
605
                         const SExtSchema* pExtSchema, bool isStmt) {
606
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
130,110,479✔
UNCOV
607
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
UNCOV
608
    return TSDB_CODE_INVALID_PARA;
×
609
  }
610

611
  pResInfo->numOfCols = numOfCols;
130,111,766✔
612
  if (pResInfo->fields != NULL) {
130,110,645✔
613
    taosMemoryFree(pResInfo->fields);
23,356✔
614
  }
615
  if (pResInfo->userFields != NULL) {
130,110,896✔
616
    taosMemoryFree(pResInfo->userFields);
23,356✔
617
  }
618
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
130,109,798✔
619
  if (NULL == pResInfo->fields) return terrno;
130,108,538✔
620
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
130,108,625✔
621
  if (NULL == pResInfo->userFields) {
130,107,598✔
UNCOV
622
    taosMemoryFree(pResInfo->fields);
×
UNCOV
623
    return terrno;
×
624
  }
625
  if (numOfCols != pResInfo->numOfCols) {
130,099,474✔
UNCOV
626
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
UNCOV
627
    return TSDB_CODE_FAILED;
×
628
  }
629

630
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
773,691,427✔
631
    pResInfo->fields[i].type = pSchema[i].type;
643,585,326✔
632

633
    pResInfo->userFields[i].type = pSchema[i].type;
643,588,559✔
634
    // userFields must convert to type bytes, no matter isStmt or not
635
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
643,588,959✔
636
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
643,589,433✔
637
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
643,590,875✔
638
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,492,393✔
639
    }
640

641
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
643,585,698✔
642
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
643,590,598✔
643
  }
644
  return TSDB_CODE_SUCCESS;
130,111,008✔
645
}
646

647
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
91,030,839✔
648
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
91,030,839✔
649
      precision != TSDB_TIME_PRECISION_NANO) {
UNCOV
650
    return;
×
651
  }
652

653
  pResInfo->precision = precision;
91,030,839✔
654
}
655

656
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
100,699,672✔
657
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
100,699,672✔
658
  if (NULL == nodeList) {
100,704,156✔
659
    return terrno;
105✔
660
  }
661
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
100,704,388✔
662

663
  int32_t dbNum = taosArrayGetSize(pDbVgList);
100,704,388✔
664
  for (int32_t i = 0; i < dbNum; ++i) {
198,926,248✔
665
    SArray* pVg = taosArrayGetP(pDbVgList, i);
98,214,975✔
666
    if (NULL == pVg) {
98,217,030✔
UNCOV
667
      continue;
×
668
    }
669
    int32_t vgNum = taosArrayGetSize(pVg);
98,217,030✔
670
    if (vgNum <= 0) {
98,216,685✔
671
      continue;
706,028✔
672
    }
673

674
    for (int32_t j = 0; j < vgNum; ++j) {
325,270,201✔
675
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
227,756,622✔
676
      if (NULL == pInfo) {
227,758,726✔
UNCOV
677
        taosArrayDestroy(nodeList);
×
UNCOV
678
        return TSDB_CODE_OUT_OF_RANGE;
×
679
      }
680
      SQueryNodeLoad load = {0};
227,758,726✔
681
      load.addr.nodeId = pInfo->vgId;
227,756,739✔
682
      load.addr.epSet = pInfo->epSet;
227,757,432✔
683

684
      if (NULL == taosArrayPush(nodeList, &load)) {
227,752,647✔
UNCOV
685
        taosArrayDestroy(nodeList);
×
686
        return terrno;
×
687
      }
688
    }
689
  }
690

691
  int32_t vnodeNum = taosArrayGetSize(nodeList);
100,711,273✔
692
  if (vnodeNum > 0) {
100,710,444✔
693
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
97,195,946✔
694
    goto _return;
97,196,862✔
695
  }
696

697
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
3,514,498✔
698
  if (mnodeNum <= 0) {
3,511,856✔
UNCOV
699
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
UNCOV
700
    goto _return;
×
701
  }
702

703
  void* pData = taosArrayGet(pMnodeList, 0);
3,511,856✔
704
  if (NULL == pData) {
3,511,856✔
UNCOV
705
    taosArrayDestroy(nodeList);
×
UNCOV
706
    return TSDB_CODE_OUT_OF_RANGE;
×
707
  }
708
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
3,511,856✔
709
    taosArrayDestroy(nodeList);
×
UNCOV
710
    return terrno;
×
711
  }
712

713
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
3,511,856✔
714

715
_return:
120,274✔
716

717
  *pNodeList = nodeList;
100,707,761✔
718

719
  return TSDB_CODE_SUCCESS;
100,706,401✔
720
}
721

722
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
708,468✔
723
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
708,468✔
724
  if (NULL == nodeList) {
708,468✔
UNCOV
725
    return terrno;
×
726
  }
727

728
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
708,468✔
729
  if (qNodeNum > 0) {
708,468✔
730
    void* pData = taosArrayGet(pQnodeList, 0);
614,738✔
731
    if (NULL == pData) {
614,738✔
UNCOV
732
      taosArrayDestroy(nodeList);
×
UNCOV
733
      return TSDB_CODE_OUT_OF_RANGE;
×
734
    }
735
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
614,738✔
UNCOV
736
      taosArrayDestroy(nodeList);
×
UNCOV
737
      return terrno;
×
738
    }
739
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
614,738✔
740
    goto _return;
614,738✔
741
  }
742

743
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
93,730✔
744
  if (mnodeNum <= 0) {
93,730✔
745
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
5,152✔
746
    goto _return;
5,152✔
747
  }
748

749
  void* pData = taosArrayGet(pMnodeList, 0);
88,578✔
750
  if (NULL == pData) {
88,578✔
UNCOV
751
    taosArrayDestroy(nodeList);
×
UNCOV
752
    return TSDB_CODE_OUT_OF_RANGE;
×
753
  }
754
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
88,578✔
UNCOV
755
    taosArrayDestroy(nodeList);
×
UNCOV
756
    return terrno;
×
757
  }
758

759
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
88,578✔
760

761
_return:
×
762

763
  *pNodeList = nodeList;
708,468✔
764

765
  return TSDB_CODE_SUCCESS;
708,468✔
766
}
767

768
void freeVgList(void* list) {
12,768,964✔
769
  SArray* pList = *(SArray**)list;
12,768,964✔
770
  taosArrayDestroy(pList);
12,769,744✔
771
}
12,774,471✔
772

773
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
88,585,263✔
774
  SArray* pDbVgList = NULL;
88,585,263✔
775
  SArray* pQnodeList = NULL;
88,585,263✔
776
  FDelete fp = NULL;
88,585,263✔
777
  int32_t code = 0;
88,585,263✔
778

779
  switch (tsQueryPolicy) {
88,585,263✔
780
    case QUERY_POLICY_VNODE:
87,876,029✔
781
    case QUERY_POLICY_CLIENT: {
782
      if (pResultMeta) {
87,876,029✔
783
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
87,876,743✔
784
        if (NULL == pDbVgList) {
87,876,741✔
UNCOV
785
          code = terrno;
×
UNCOV
786
          goto _return;
×
787
        }
788
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
87,876,741✔
789
        for (int32_t i = 0; i < dbNum; ++i) {
173,320,352✔
790
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
85,443,361✔
791
          if (pRes->code || NULL == pRes->pRes) {
85,442,777✔
792
            continue;
1,093✔
793
          }
794

795
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
170,882,658✔
UNCOV
796
            code = terrno;
×
UNCOV
797
            goto _return;
×
798
          }
799
        }
800
      } else {
UNCOV
801
        fp = freeVgList;
×
802

UNCOV
803
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
UNCOV
804
        if (dbNum > 0) {
×
805
          SCatalog*     pCtg = NULL;
×
806
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
UNCOV
807
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
UNCOV
808
          if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
809
            goto _return;
×
810
          }
811

812
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
813
          if (NULL == pDbVgList) {
×
814
            code = terrno;
×
815
            goto _return;
×
816
          }
817
          SArray* pVgList = NULL;
×
818
          for (int32_t i = 0; i < dbNum; ++i) {
×
UNCOV
819
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
UNCOV
820
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
821
                                     .requestId = pRequest->requestId,
×
822
                                     .requestObjRefId = pRequest->self,
×
823
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
824

825
            // catalogGetDBVgList will handle dbFName == null.
826
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
827
            if (code) {
×
828
              goto _return;
×
829
            }
830

831
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
832
              code = terrno;
×
UNCOV
833
              goto _return;
×
834
            }
835
          }
836
        }
837
      }
838

839
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
87,876,991✔
840
      break;
87,877,806✔
841
    }
842
    case QUERY_POLICY_HYBRID:
708,468✔
843
    case QUERY_POLICY_QNODE: {
844
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
808,704✔
845
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
100,236✔
846
        if (pRes->code) {
100,236✔
UNCOV
847
          pQnodeList = NULL;
×
848
        } else {
849
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
100,236✔
850
          if (NULL == pQnodeList) {
100,236✔
UNCOV
851
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
852
            goto _return;
×
853
          }
854
        }
855
      } else {
856
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
608,232✔
857
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
608,232✔
858
        if (pInst->pQnodeList) {
608,232✔
859
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
608,232✔
860
          if (NULL == pQnodeList) {
608,232✔
861
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
862
            goto _return;
×
863
          }
864
        }
865
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
608,232✔
866
      }
867

868
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
708,468✔
869
      break;
708,468✔
870
    }
871
    default:
766✔
872
      tscError("unknown query policy: %d", tsQueryPolicy);
766✔
UNCOV
873
      return TSDB_CODE_APP_ERROR;
×
874
  }
875

876
_return:
88,586,274✔
877
  taosArrayDestroyEx(pDbVgList, fp);
88,586,274✔
878
  taosArrayDestroy(pQnodeList);
88,586,567✔
879

880
  return code;
88,586,790✔
881
}
882

883
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
12,821,824✔
884
  SArray* pDbVgList = NULL;
12,821,824✔
885
  SArray* pQnodeList = NULL;
12,821,824✔
886
  int32_t code = 0;
12,822,058✔
887

888
  switch (tsQueryPolicy) {
12,822,058✔
889
    case QUERY_POLICY_VNODE:
12,822,879✔
890
    case QUERY_POLICY_CLIENT: {
891
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
12,822,879✔
892
      if (dbNum > 0) {
12,828,925✔
893
        SCatalog*     pCtg = NULL;
12,773,160✔
894
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
12,773,007✔
895
        code = catalogGetHandle(pInst->clusterId, &pCtg);
12,773,393✔
896
        if (code != TSDB_CODE_SUCCESS) {
12,769,740✔
UNCOV
897
          goto _return;
×
898
        }
899

900
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
12,769,740✔
901
        if (NULL == pDbVgList) {
12,772,946✔
UNCOV
902
          code = terrno;
×
UNCOV
903
          goto _return;
×
904
        }
905
        SArray* pVgList = NULL;
12,772,946✔
906
        for (int32_t i = 0; i < dbNum; ++i) {
25,545,931✔
907
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
12,767,146✔
908
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
12,770,332✔
909
                                   .requestId = pRequest->requestId,
12,770,387✔
910
                                   .requestObjRefId = pRequest->self,
12,769,784✔
911
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
12,771,329✔
912

913
          // catalogGetDBVgList will handle dbFName == null.
914
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
12,775,072✔
915
          if (code) {
12,774,548✔
UNCOV
916
            goto _return;
×
917
          }
918

919
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
12,774,819✔
UNCOV
920
            code = terrno;
×
UNCOV
921
            goto _return;
×
922
          }
923
        }
924
      }
925

926
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
12,834,865✔
927
      break;
12,828,012✔
928
    }
929
    case QUERY_POLICY_HYBRID:
×
930
    case QUERY_POLICY_QNODE: {
UNCOV
931
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
932

UNCOV
933
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
UNCOV
934
      break;
×
935
    }
936
    default:
416✔
937
      tscError("unknown query policy: %d", tsQueryPolicy);
416✔
938
      return TSDB_CODE_APP_ERROR;
×
939
  }
940

941
_return:
12,828,042✔
942

943
  taosArrayDestroyEx(pDbVgList, freeVgList);
12,827,049✔
944
  taosArrayDestroy(pQnodeList);
12,828,533✔
945

946
  return code;
12,829,848✔
947
}
948

949
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
12,824,013✔
950
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
12,824,013✔
951

952
  SExecResult      res = {0};
12,825,112✔
953
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
12,825,229✔
954
                           .requestId = pRequest->requestId,
12,825,088✔
955
                           .requestObjRefId = pRequest->self};
12,824,458✔
956
  SSchedulerReq    req = {
19,472,929✔
957
         .syncReq = true,
958
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
12,825,088✔
959
         .pConn = &conn,
960
         .pNodeList = pNodeList,
961
         .pDag = pDag,
962
         .sql = pRequest->sqlstr,
12,825,088✔
963
         .startTs = pRequest->metric.start,
12,824,936✔
964
         .execFp = NULL,
965
         .cbParam = NULL,
966
         .chkKillFp = chkRequestKilled,
967
         .chkKillParam = (void*)pRequest->self,
12,824,336✔
968
         .pExecRes = &res,
969
         .source = pRequest->source,
12,825,069✔
970
         .pWorkerCb = getTaskPoolWorkerCb(),
12,825,399✔
971
  };
972

973
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
12,825,728✔
974

975
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
12,830,599✔
976
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
12,830,527✔
977

978
  if (code != TSDB_CODE_SUCCESS) {
12,830,196✔
UNCOV
979
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
980

UNCOV
981
    pRequest->code = code;
×
UNCOV
982
    terrno = code;
×
UNCOV
983
    return pRequest->code;
×
984
  }
985

986
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
12,830,196✔
987
      TDMT_VND_CREATE_TABLE == pRequest->type) {
17,849✔
988
    pRequest->body.resInfo.numOfRows = res.numOfRows;
12,818,371✔
989
    if (TDMT_VND_SUBMIT == pRequest->type) {
12,818,705✔
990
      STscObj*            pTscObj = pRequest->pTscObj;
12,811,938✔
991
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
12,811,790✔
992
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
12,812,490✔
993
    }
994

995
    schedulerFreeJob(&pRequest->body.queryJob, 0);
12,819,621✔
996
  }
997

998
  pRequest->code = res.code;
12,830,877✔
999
  terrno = res.code;
12,830,877✔
1000
  return pRequest->code;
12,829,942✔
1001
}
1002

1003
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
477,261,682✔
1004
  SArray*      pArray = NULL;
477,261,682✔
1005
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
477,261,682✔
1006
  if (NULL == pRsp->aCreateTbRsp) {
477,261,682✔
1007
    return TSDB_CODE_SUCCESS;
467,719,067✔
1008
  }
1009

1010
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
9,548,571✔
1011
  for (int32_t i = 0; i < tbNum; ++i) {
22,801,553✔
1012
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
13,250,305✔
1013
    if (pTbRsp->pMeta) {
13,250,091✔
1014
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
12,501,233✔
1015
    }
1016
  }
1017

1018
  return TSDB_CODE_SUCCESS;
9,551,248✔
1019
}
1020

1021
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
73,870,419✔
1022
  int32_t code = 0;
73,870,419✔
1023
  SArray* pArray = NULL;
73,870,419✔
1024
  SArray* pTbArray = (SArray*)res;
73,870,419✔
1025
  int32_t tbNum = taosArrayGetSize(pTbArray);
73,870,419✔
1026
  if (tbNum <= 0) {
73,869,706✔
UNCOV
1027
    return TSDB_CODE_SUCCESS;
×
1028
  }
1029

1030
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
73,869,706✔
1031
  if (NULL == pArray) {
73,869,273✔
UNCOV
1032
    return terrno;
×
1033
  }
1034

1035
  for (int32_t i = 0; i < tbNum; ++i) {
198,713,536✔
1036
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
124,846,042✔
1037
    if (NULL == tbInfo) {
124,844,851✔
UNCOV
1038
      code = terrno;
×
UNCOV
1039
      goto _return;
×
1040
    }
1041
    STbSVersion tbSver = {
124,844,851✔
1042
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
124,845,121✔
1043
    if (NULL == taosArrayPush(pArray, &tbSver)) {
124,845,566✔
UNCOV
1044
      code = terrno;
×
UNCOV
1045
      goto _return;
×
1046
    }
1047
  }
1048

1049
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
73,867,494✔
1050
                           .requestId = pRequest->requestId,
73,870,698✔
1051
                           .requestObjRefId = pRequest->self,
73,869,765✔
1052
                           .mgmtEps = *epset};
1053

1054
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
73,868,487✔
1055

1056
_return:
73,870,166✔
1057

1058
  taosArrayDestroy(pArray);
73,869,137✔
1059
  return code;
73,868,862✔
1060
}
1061

1062
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
8,915,370✔
1063
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
8,915,370✔
1064
}
1065

1066
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
57,905,379✔
1067
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
57,905,379✔
1068
}
1069

1070
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
620,271,475✔
1071
  if (NULL == pRequest->body.resInfo.execRes.res) {
620,271,475✔
1072
    return pRequest->code;
26,387,436✔
1073
  }
1074

1075
  SCatalog*     pCatalog = NULL;
593,886,400✔
1076
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
593,890,200✔
1077

1078
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
593,901,971✔
1079
  if (code) {
593,891,997✔
UNCOV
1080
    return code;
×
1081
  }
1082

1083
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
593,891,997✔
1084
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
593,902,892✔
1085

1086
  switch (pRes->msgType) {
593,902,544✔
1087
    case TDMT_VND_ALTER_TABLE:
3,924,719✔
1088
    case TDMT_MND_ALTER_STB: {
1089
      code = handleAlterTbExecRes(pRes->res, pCatalog);
3,924,719✔
1090
      break;
3,924,719✔
1091
    }
1092
    case TDMT_VND_CREATE_TABLE: {
38,462,639✔
1093
      SArray* pList = (SArray*)pRes->res;
38,462,639✔
1094
      int32_t num = taosArrayGetSize(pList);
38,464,184✔
1095
      for (int32_t i = 0; i < num; ++i) {
81,804,583✔
1096
        void* res = taosArrayGetP(pList, i);
43,334,744✔
1097
        // handleCreateTbExecRes will handle res == null
1098
        code = handleCreateTbExecRes(res, pCatalog);
43,337,510✔
1099
      }
1100
      break;
38,469,839✔
1101
    }
1102
    case TDMT_MND_CREATE_STB: {
369,870✔
1103
      code = handleCreateTbExecRes(pRes->res, pCatalog);
369,870✔
1104
      break;
369,870✔
1105
    }
1106
    case TDMT_VND_SUBMIT: {
477,262,969✔
1107
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
477,262,969✔
1108

1109
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
477,270,902✔
1110
      break;
477,268,711✔
1111
    }
1112
    case TDMT_SCH_QUERY:
73,869,903✔
1113
    case TDMT_SCH_MERGE_QUERY: {
1114
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
73,869,903✔
1115
      break;
73,864,328✔
1116
    }
1117
    default:
538✔
1118
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
538✔
1119
               pRequest->type, pRequest->requestId);
UNCOV
1120
      code = TSDB_CODE_APP_ERROR;
×
1121
  }
1122

1123
  return code;
593,897,467✔
1124
}
1125

1126
static bool incompletaFileParsing(SNode* pStmt) {
606,040,325✔
1127
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
606,040,325✔
1128
}
1129

UNCOV
1130
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
UNCOV
1131
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1132

UNCOV
1133
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
UNCOV
1134
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1135
    int64_t analyseStart = taosGetTimestampUs();
×
UNCOV
1136
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
UNCOV
1137
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1138
  }
1139

1140
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1141
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1142
  }
1143

1144
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1145
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1146
}
×
1147

1148
void returnToUser(SRequestObj* pRequest) {
12,731,943✔
1149
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
12,731,943✔
1150
    // return to client
1151
    doRequestCallback(pRequest, pRequest->code);
12,731,943✔
1152
    return;
12,731,943✔
1153
  }
1154

1155
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
UNCOV
1156
  if (pUserReq) {
×
UNCOV
1157
    pUserReq->code = pRequest->code;
×
1158
    // return to client
UNCOV
1159
    doRequestCallback(pUserReq, pUserReq->code);
×
UNCOV
1160
    (void)releaseRequest(pRequest->relation.userRefId);
×
UNCOV
1161
    return;
×
1162
  } else {
UNCOV
1163
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1164
             pRequest->relation.userRefId, pRequest->requestId);
1165
  }
1166
}
1167

1168
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1169
  int64_t     lastTs = 0;
×
1170
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
UNCOV
1171
  int32_t     numOfFields = taos_num_fields(pRes);
×
1172

UNCOV
1173
  int32_t code = createDataBlock(pBlock);
×
UNCOV
1174
  if (code) {
×
UNCOV
1175
    return code;
×
1176
  }
1177

1178
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1179
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1180
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
UNCOV
1181
    if (TSDB_CODE_SUCCESS != code) {
×
1182
      blockDataDestroy(*pBlock);
×
1183
      return code;
×
1184
    }
1185
  }
1186

1187
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1188
  if (TSDB_CODE_SUCCESS != code) {
×
1189
    blockDataDestroy(*pBlock);
×
1190
    return code;
×
1191
  }
1192

UNCOV
1193
  for (int32_t i = 0; i < numOfRows; ++i) {
×
UNCOV
1194
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
UNCOV
1195
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1196
      tscError("invalid data from vnode");
×
1197
      blockDataDestroy(*pBlock);
×
1198
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1199
    }
UNCOV
1200
    int64_t ts = *(int64_t*)pRow[0];
×
UNCOV
1201
    if (lastTs < ts) {
×
1202
      lastTs = ts;
×
1203
    }
1204

1205
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1206
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1207
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
UNCOV
1208
      if (TSDB_CODE_SUCCESS != code) {
×
1209
        blockDataDestroy(*pBlock);
×
1210
        return code;
×
1211
      }
1212
    }
1213

1214
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1215
            *(int64_t*)pRow[2]);
1216
  }
1217

1218
  (*pBlock)->info.window.ekey = lastTs;
×
1219
  (*pBlock)->info.rows = numOfRows;
×
1220

UNCOV
1221
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
UNCOV
1222
  return TSDB_CODE_SUCCESS;
×
1223
}
1224

UNCOV
1225
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
UNCOV
1226
  SRequestObj* pRequest = (SRequestObj*)res;
×
1227
  if (pRequest->code) {
×
1228
    returnToUser(pRequest);
×
UNCOV
1229
    return;
×
1230
  }
1231

UNCOV
1232
  SSDataBlock* pBlock = NULL;
×
UNCOV
1233
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1234
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1235
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1236
             tstrerror(pRequest->code));
1237
    returnToUser(pRequest);
×
1238
    return;
×
1239
  }
1240

1241
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1242
  if (pNextReq) {
×
1243
    continuePostSubQuery(pNextReq, pBlock);
×
1244
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1245
  } else {
1246
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1247
             pRequest->relation.nextRefId, pRequest->requestId);
1248
  }
1249

1250
  blockDataDestroy(pBlock);
×
1251
}
1252

1253
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
UNCOV
1254
  SRequestObj* pRequest = pWrapper->pRequest;
×
1255
  if (TD_RES_QUERY(pRequest)) {
×
UNCOV
1256
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
UNCOV
1257
    return;
×
1258
  }
1259

UNCOV
1260
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
UNCOV
1261
  if (pNextReq) {
×
1262
    continuePostSubQuery(pNextReq, NULL);
×
1263
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1264
  } else {
1265
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1266
             pRequest->relation.nextRefId, pRequest->requestId);
1267
  }
1268
}
1269

1270
// todo refacto the error code  mgmt
1271
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
607,097,288✔
1272
  SSqlCallbackWrapper* pWrapper = param;
607,097,288✔
1273
  SRequestObj*         pRequest = pWrapper->pRequest;
607,097,288✔
1274
  STscObj*             pTscObj = pRequest->pTscObj;
607,104,182✔
1275

1276
  pRequest->code = code;
607,104,454✔
1277
  if (pResult) {
607,108,151✔
1278
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
607,069,654✔
1279
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
607,077,433✔
1280
  }
1281

1282
  int32_t type = pRequest->type;
607,102,679✔
1283
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
607,088,950✔
1284
    if (pResult) {
506,473,744✔
1285
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
506,457,508✔
1286

1287
      // record the insert rows
1288
      if (TDMT_VND_SUBMIT == type) {
506,462,331✔
1289
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
464,599,463✔
1290
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
464,601,659✔
1291
      }
1292
    }
1293
    schedulerFreeJob(&pRequest->body.queryJob, 0);
506,482,276✔
1294
  }
1295

1296
  taosMemoryFree(pResult);
607,101,187✔
1297
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
607,097,883✔
1298
           pRequest->requestId);
1299

1300
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
607,095,272✔
1301
      pRequest->stmtBindVersion == 0) {
50,626✔
1302
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
50,626✔
1303
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1304
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
50,626✔
UNCOV
1305
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1306
    }
1307
    restartAsyncQuery(pRequest, code);
50,626✔
1308
    return;
50,626✔
1309
  }
1310

1311
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
607,044,646✔
1312
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
607,044,646✔
1313
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
2,916,422✔
1314
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1315
    }
1316
  }
1317

1318
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
607,055,061✔
1319
  int32_t code1 = handleQueryExecRsp(pRequest);
607,044,351✔
1320
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
607,051,774✔
UNCOV
1321
    pRequest->code = code1;
×
1322
  }
1323

1324
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,213,093,205✔
1325
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
606,037,341✔
1326
    continueInsertFromCsv(pWrapper, pRequest);
12,066✔
1327
    return;
12,066✔
1328
  }
1329

1330
  if (pRequest->relation.nextRefId) {
607,047,964✔
UNCOV
1331
    handlePostSubQuery(pWrapper);
×
1332
  } else {
1333
    destorySqlCallbackWrapper(pWrapper);
607,045,711✔
1334
    pRequest->pWrapper = NULL;
607,037,761✔
1335

1336
    // return to client
1337
    doRequestCallback(pRequest, code);
607,040,640✔
1338
  }
1339
}
1340

1341
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
13,232,888✔
1342
  int32_t code = 0;
13,232,888✔
1343
  int32_t subplanNum = 0;
13,232,888✔
1344

1345
  if (pQuery->pRoot) {
13,232,888✔
1346
    pRequest->stmtType = pQuery->pRoot->type;
12,830,091✔
1347
  }
1348

1349
  if (pQuery->pRoot && !pRequest->inRetry) {
13,233,580✔
1350
    STscObj*            pTscObj = pRequest->pTscObj;
12,829,370✔
1351
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
12,827,919✔
1352
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
12,829,364✔
1353
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
12,819,518✔
1354
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
10,621✔
1355
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
10,800✔
1356
    }
1357
  }
1358

1359
  pRequest->body.execMode = pQuery->execMode;
13,237,073✔
1360
  switch (pQuery->execMode) {
13,236,861✔
UNCOV
1361
    case QUERY_EXEC_MODE_LOCAL:
×
UNCOV
1362
      if (!pRequest->validateOnly) {
×
UNCOV
1363
        if (NULL == pQuery->pRoot) {
×
UNCOV
1364
          terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
1365
          code = terrno;
×
1366
        } else {
UNCOV
1367
          code = execLocalCmd(pRequest, pQuery);
×
1368
        }
1369
      }
1370
      break;
×
1371
    case QUERY_EXEC_MODE_RPC:
406,540✔
1372
      if (!pRequest->validateOnly) {
406,540✔
1373
        code = execDdlQuery(pRequest, pQuery);
406,540✔
1374
      }
1375
      break;
406,643✔
1376
    case QUERY_EXEC_MODE_SCHEDULE: {
12,829,303✔
1377
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
12,829,303✔
1378
      if (NULL == pMnodeList) {
12,830,332✔
1379
        code = terrno;
×
UNCOV
1380
        break;
×
1381
      }
1382
      SQueryPlan* pDag = NULL;
12,830,332✔
1383
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
12,830,608✔
1384
      if (TSDB_CODE_SUCCESS == code) {
12,827,231✔
1385
        pRequest->body.subplanNum = pDag->numOfSubplans;
12,827,117✔
1386
        if (!pRequest->validateOnly) {
12,828,518✔
1387
          SArray* pNodeList = NULL;
12,827,047✔
1388
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
12,826,961✔
1389
          if (TSDB_CODE_SUCCESS == code) {
12,828,115✔
1390
            code = scheduleQuery(pRequest, pDag, pNodeList);
12,828,938✔
1391
          }
1392
          taosArrayDestroy(pNodeList);
12,828,645✔
1393
        }
1394
      }
1395
      taosArrayDestroy(pMnodeList);
12,829,616✔
1396
      break;
12,830,485✔
1397
    }
UNCOV
1398
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
UNCOV
1399
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
UNCOV
1400
      break;
×
UNCOV
1401
    default:
×
UNCOV
1402
      break;
×
1403
  }
1404

1405
  if (!keepQuery) {
13,237,183✔
UNCOV
1406
    qDestroyQuery(pQuery);
×
1407
  }
1408

1409
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
13,237,183✔
1410
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
36,954✔
1411
    if (TSDB_CODE_SUCCESS != ret) {
36,954✔
UNCOV
1412
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1413
               pRequest->requestId);
1414
    }
1415
  }
1416

1417
  if (TSDB_CODE_SUCCESS == code) {
13,236,424✔
1418
    code = handleQueryExecRsp(pRequest);
13,235,018✔
1419
  }
1420

1421
  if (TSDB_CODE_SUCCESS != code) {
13,235,633✔
1422
    pRequest->code = code;
30,550✔
1423
  }
1424

1425
  if (res) {
13,235,633✔
UNCOV
1426
    *res = pRequest->body.resInfo.execRes.res;
×
UNCOV
1427
    pRequest->body.resInfo.execRes.res = NULL;
×
1428
  }
1429
}
13,235,633✔
1430

1431
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
607,588,161✔
1432
                                 SSqlCallbackWrapper* pWrapper) {
1433
  int32_t code = TSDB_CODE_SUCCESS;
607,588,161✔
1434
  pRequest->type = pQuery->msgType;
607,588,161✔
1435
  SArray*     pMnodeList = NULL;
607,596,759✔
1436
  SQueryPlan* pDag = NULL;
607,596,759✔
1437
  int64_t     st = taosGetTimestampUs();
607,584,312✔
1438

1439
  if (!pRequest->parseOnly) {
607,584,312✔
1440
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
607,588,179✔
1441
    if (NULL == pMnodeList) {
607,570,942✔
UNCOV
1442
      code = terrno;
×
1443
    }
1444
    SPlanContext cxt = {.queryId = pRequest->requestId,
655,131,800✔
1445
                        .acctId = pRequest->pTscObj->acctId,
607,601,569✔
1446
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
607,605,993✔
1447
                        .pAstRoot = pQuery->pRoot,
607,614,749✔
1448
                        .showRewrite = pQuery->showRewrite,
607,616,751✔
1449
                        .isView = pWrapper->pParseCtx->isView,
607,599,974✔
1450
                        .isAudit = pWrapper->pParseCtx->isAudit,
607,609,217✔
1451
                        .pMsg = pRequest->msgBuf,
607,595,746✔
1452
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1453
                        .pUser = pRequest->pTscObj->user,
607,605,932✔
1454
                        .sysInfo = pRequest->pTscObj->sysInfo,
607,603,401✔
1455
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
607,607,569✔
1456
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
607,598,037✔
1457
    if (TSDB_CODE_SUCCESS == code) {
607,607,542✔
1458
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
607,608,185✔
1459
    }
1460
    if (code) {
607,591,113✔
1461
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
273,906✔
1462
               pRequest->requestId);
1463
    } else {
1464
      pRequest->body.subplanNum = pDag->numOfSubplans;
607,317,207✔
1465
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
607,329,426✔
1466
    }
1467
  }
1468

1469
  pRequest->metric.execStart = taosGetTimestampUs();
607,606,365✔
1470
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
607,607,752✔
1471

1472
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
631,360,962✔
1473
    SArray* pNodeList = NULL;
607,077,022✔
1474
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
607,089,123✔
1475
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
88,585,267✔
1476
    }
1477

1478
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
607,082,706✔
1479
                             .requestId = pRequest->requestId,
607,091,940✔
1480
                             .requestObjRefId = pRequest->self};
607,093,520✔
1481
    SSchedulerReq    req = {
630,857,570✔
1482
           .syncReq = false,
1483
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
607,082,716✔
1484
           .pConn = &conn,
1485
           .pNodeList = pNodeList,
1486
           .pDag = pDag,
1487
           .allocatorRefId = pRequest->allocatorRefId,
607,082,716✔
1488
           .sql = pRequest->sqlstr,
607,067,179✔
1489
           .startTs = pRequest->metric.start,
607,081,119✔
1490
           .execFp = schedulerExecCb,
1491
           .cbParam = pWrapper,
1492
           .chkKillFp = chkRequestKilled,
1493
           .chkKillParam = (void*)pRequest->self,
607,068,753✔
1494
           .pExecRes = NULL,
1495
           .source = pRequest->source,
607,059,690✔
1496
           .pWorkerCb = getTaskPoolWorkerCb(),
607,060,760✔
1497
    };
1498
    if (TSDB_CODE_SUCCESS == code) {
607,071,493✔
1499
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
607,099,165✔
1500
    }
1501

1502
    taosArrayDestroy(pNodeList);
607,070,568✔
1503
  } else {
1504
    qDestroyQueryPlan(pDag);
532,165✔
1505
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
511,341✔
1506
             pRequest->requestId);
1507
    destorySqlCallbackWrapper(pWrapper);
511,341✔
1508
    pRequest->pWrapper = NULL;
511,341✔
1509
    if (TSDB_CODE_SUCCESS != code) {
511,341✔
1510
      pRequest->code = terrno;
273,906✔
1511
    }
1512

1513
    doRequestCallback(pRequest, code);
511,341✔
1514
  }
1515

1516
  // todo not to be released here
1517
  taosArrayDestroy(pMnodeList);
607,609,923✔
1518

1519
  return code;
607,600,809✔
1520
}
1521

1522
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
632,207,485✔
1523
  int32_t code = 0;
632,207,485✔
1524

1525
  if (pRequest->parseOnly) {
632,207,485✔
1526
    doRequestCallback(pRequest, 0);
303,291✔
1527
    return;
303,291✔
1528
  }
1529

1530
  pRequest->body.execMode = pQuery->execMode;
631,917,732✔
1531
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
631,916,749✔
1532
    destorySqlCallbackWrapper(pWrapper);
24,316,649✔
1533
    pRequest->pWrapper = NULL;
24,317,255✔
1534
  }
1535

1536
  if (pQuery->pRoot && !pRequest->inRetry) {
631,892,985✔
1537
    STscObj*            pTscObj = pRequest->pTscObj;
631,903,787✔
1538
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
631,921,800✔
1539
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
631,911,494✔
1540
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
518,505,675✔
1541
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
464,557,112✔
1542
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
167,349,448✔
1543
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
82,558,907✔
1544
    }
1545
  }
1546

1547
  switch (pQuery->execMode) {
631,930,887✔
1548
    case QUERY_EXEC_MODE_LOCAL:
5,666,117✔
1549
      asyncExecLocalCmd(pRequest, pQuery);
5,666,117✔
1550
      break;
5,666,117✔
1551
    case QUERY_EXEC_MODE_RPC:
18,282,912✔
1552
      code = asyncExecDdlQuery(pRequest, pQuery);
18,282,912✔
1553
      break;
18,282,912✔
1554
    case QUERY_EXEC_MODE_SCHEDULE: {
607,599,450✔
1555
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
607,599,450✔
1556
      break;
607,603,832✔
1557
    }
1558
    case QUERY_EXEC_MODE_EMPTY_RESULT:
368,226✔
1559
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
368,226✔
1560
      doRequestCallback(pRequest, 0);
368,226✔
1561
      break;
368,226✔
UNCOV
1562
    default:
×
UNCOV
1563
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
UNCOV
1564
      doRequestCallback(pRequest, -1);
×
UNCOV
1565
      break;
×
1566
  }
1567
}
1568

1569
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
12,032✔
1570
  SCatalog* pCatalog = NULL;
12,032✔
1571
  int32_t   code = 0;
12,032✔
1572
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
12,032✔
1573
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
12,032✔
1574

1575
  if (dbNum <= 0 && tblNum <= 0) {
12,032✔
1576
    return TSDB_CODE_APP_ERROR;
12,032✔
1577
  }
1578

UNCOV
1579
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
UNCOV
1580
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1581
    return code;
×
1582
  }
1583

UNCOV
1584
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
UNCOV
1585
                           .requestId = pRequest->requestId,
×
UNCOV
1586
                           .requestObjRefId = pRequest->self,
×
UNCOV
1587
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1588

1589
  for (int32_t i = 0; i < dbNum; ++i) {
×
1590
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1591

1592
    // catalogRefreshDBVgInfo will handle dbFName == null.
1593
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
1594
    if (code != TSDB_CODE_SUCCESS) {
×
1595
      return code;
×
1596
    }
1597
  }
1598

1599
  for (int32_t i = 0; i < tblNum; ++i) {
×
UNCOV
1600
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1601

1602
    // catalogRefreshTableMeta will handle tableName == null.
1603
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
1604
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1605
      return code;
×
1606
    }
1607
  }
1608

1609
  return code;
×
1610
}
1611

1612
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
4,293,527✔
1613
  SCatalog* pCatalog = NULL;
4,293,527✔
1614
  int32_t   tbNum = taosArrayGetSize(tbList);
4,293,527✔
1615
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,293,527✔
1616
  if (code != TSDB_CODE_SUCCESS) {
4,293,527✔
UNCOV
1617
    return code;
×
1618
  }
1619

1620
  if (isView) {
4,293,527✔
1621
    for (int32_t i = 0; i < tbNum; ++i) {
859,054✔
1622
      SName* pViewName = taosArrayGet(tbList, i);
429,527✔
1623
      char   dbFName[TSDB_DB_FNAME_LEN];
419,572✔
1624
      if (NULL == pViewName) {
429,527✔
UNCOV
1625
        continue;
×
1626
      }
1627
      (void)tNameGetFullDbName(pViewName, dbFName);
429,527✔
1628
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
429,527✔
1629
    }
1630
  } else {
1631
    for (int32_t i = 0; i < tbNum; ++i) {
5,762,545✔
1632
      SName* pTbName = taosArrayGet(tbList, i);
1,898,545✔
1633
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
1,898,545✔
1634
    }
1635
  }
1636

1637
  return TSDB_CODE_SUCCESS;
4,293,527✔
1638
}
1639

1640
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
3,407,368✔
1641
  pEpSet->version = 0;
3,407,368✔
1642

1643
  // init mnode ip set
1644
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
3,407,772✔
1645
  mgmtEpSet->numOfEps = 0;
3,408,594✔
1646
  mgmtEpSet->inUse = 0;
3,408,652✔
1647

1648
  if (firstEp && firstEp[0] != 0) {
3,408,652✔
1649
    if (strlen(firstEp) >= TSDB_EP_LEN) {
3,407,691✔
UNCOV
1650
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1651
      return -1;
×
1652
    }
1653

1654
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
3,407,691✔
1655
    if (code != TSDB_CODE_SUCCESS) {
3,407,289✔
UNCOV
1656
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1657
      return terrno;
×
1658
    }
1659
    // uint32_t addr = 0;
1660
    SIpAddr addr = {0};
3,407,289✔
1661
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
3,407,534✔
1662
    if (code) {
3,407,364✔
1663
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
790✔
1664
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1665
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
819✔
1666
    } else {
1667
      mgmtEpSet->numOfEps++;
3,406,574✔
1668
    }
1669
  }
1670

1671
  if (secondEp && secondEp[0] != 0) {
3,407,608✔
1672
    if (strlen(secondEp) >= TSDB_EP_LEN) {
2,214,951✔
UNCOV
1673
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1674
      return terrno;
×
1675
    }
1676

1677
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,214,951✔
1678
    if (code != TSDB_CODE_SUCCESS) {
2,216,248✔
UNCOV
1679
      return code;
×
1680
    }
1681
    SIpAddr addr = {0};
2,216,248✔
1682
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
2,216,506✔
1683
    if (code) {
2,214,854✔
1684
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
184✔
1685
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
UNCOV
1686
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1687
    } else {
1688
      mgmtEpSet->numOfEps++;
2,214,670✔
1689
    }
1690
  }
1691

1692
  if (mgmtEpSet->numOfEps == 0) {
3,408,391✔
1693
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
819✔
1694
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
819✔
1695
  }
1696

1697
  return 0;
3,405,449✔
1698
}
1699

1700
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp, void* param,
3,409,392✔
1701
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1702
  *pTscObj = NULL;
3,409,392✔
1703
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
3,409,392✔
1704
  if (TSDB_CODE_SUCCESS != code) {
3,409,392✔
UNCOV
1705
    return code;
×
1706
  }
1707

1708
  SRequestObj* pRequest = NULL;
3,409,392✔
1709
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
3,409,392✔
1710
  if (TSDB_CODE_SUCCESS != code) {
3,409,134✔
UNCOV
1711
    destroyTscObj(*pTscObj);
×
UNCOV
1712
    return code;
×
1713
  }
1714

1715
  pRequest->sqlstr = taosStrdup("taos_connect");
3,409,134✔
1716
  if (pRequest->sqlstr) {
3,408,729✔
1717
    pRequest->sqlLen = strlen(pRequest->sqlstr);
3,407,708✔
1718
  } else {
UNCOV
1719
    return terrno;
×
1720
  }
1721

1722
  SMsgSendInfo* body = NULL;
3,408,757✔
1723
  code = buildConnectMsg(pRequest, &body, totpCode);
3,408,757✔
1724
  if (TSDB_CODE_SUCCESS != code) {
3,407,986✔
UNCOV
1725
    destroyTscObj(*pTscObj);
×
UNCOV
1726
    return code;
×
1727
  }
1728

1729
  // int64_t transporterId = 0;
1730
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
3,407,986✔
1731
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
3,406,480✔
1732
  if (TSDB_CODE_SUCCESS != code) {
3,409,288✔
UNCOV
1733
    destroyTscObj(*pTscObj);
×
1734
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1735
    return code;
×
1736
  }
1737
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
3,409,288✔
UNCOV
1738
    destroyTscObj(*pTscObj);
×
UNCOV
1739
    tscError("failed to wait sem, code:%s", terrstr());
×
UNCOV
1740
    return terrno;
×
1741
  }
1742
  if (pRequest->code != TSDB_CODE_SUCCESS) {
3,409,392✔
1743
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
17,065✔
1744
    tscError("failed to connect to server, reason: %s", errorMsg);
17,065✔
1745

1746
    terrno = pRequest->code;
17,065✔
1747
    destroyRequest(pRequest);
17,065✔
1748
    taos_close_internal(*pTscObj);
17,065✔
1749
    *pTscObj = NULL;
17,065✔
1750
    return terrno;
17,065✔
1751
  }
1752
  if (connType == CONN_TYPE__AUTH_TEST) {
3,392,327✔
NEW
1753
    terrno = TSDB_CODE_SUCCESS;
×
UNCOV
1754
    destroyRequest(pRequest);
×
NEW
1755
    taos_close_internal(*pTscObj);
×
NEW
1756
    *pTscObj = NULL;
×
NEW
1757
    return TSDB_CODE_SUCCESS;
×
1758
  }
1759

1760
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
3,392,327✔
1761
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1762
  destroyRequest(pRequest);
3,392,327✔
1763
  return code;
3,392,327✔
1764
}
1765

1766
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
3,407,812✔
1767
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
3,407,812✔
1768
  if (*pMsgSendInfo == NULL) {
3,407,914✔
1769
    return terrno;
×
1770
  }
1771

1772
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
3,407,914✔
1773

1774
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
3,408,447✔
1775
  (*pMsgSendInfo)->requestId = pRequest->requestId;
3,407,914✔
1776
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
3,409,109✔
1777
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
3,408,065✔
1778
  if (NULL == (*pMsgSendInfo)->param) {
3,407,429✔
UNCOV
1779
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1780
    return terrno;
×
1781
  }
1782

1783
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
3,407,575✔
1784

1785
  SConnectReq connectReq = {0};
3,408,624✔
1786
  STscObj*    pObj = pRequest->pTscObj;
3,407,833✔
1787

1788
  char* db = getDbOfConnection(pObj);
3,407,575✔
1789
  if (db != NULL) {
3,407,417✔
1790
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,609,789✔
1791
  } else if (terrno) {
1,797,628✔
UNCOV
1792
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1793
    return terrno;
×
1794
  }
1795
  taosMemoryFreeClear(db);
3,407,729✔
1796

1797
  connectReq.connType = pObj->connType;
3,409,206✔
1798
  connectReq.pid = appInfo.pid;
3,409,610✔
1799
  connectReq.startTime = appInfo.startTime;
3,408,157✔
1800
  connectReq.totpCode = totpCode;
3,408,157✔
1801

1802
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
3,408,157✔
1803
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
3,408,157✔
1804
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
3,408,561✔
1805
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
3,407,366✔
1806

1807
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
3,407,366✔
1808
  void*   pReq = taosMemoryMalloc(contLen);
3,408,093✔
1809
  if (NULL == pReq) {
3,406,567✔
1810
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1811
    return terrno;
×
1812
  }
1813

1814
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
3,406,567✔
UNCOV
1815
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1816
    taosMemoryFree(pReq);
×
UNCOV
1817
    return terrno;
×
1818
  }
1819

1820
  (*pMsgSendInfo)->msgInfo.len = contLen;
3,406,456✔
1821
  (*pMsgSendInfo)->msgInfo.pData = pReq;
3,408,700✔
1822
  return TSDB_CODE_SUCCESS;
3,407,118✔
1823
}
1824

1825
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,070,425,371✔
1826
  if (NULL == pEpSet) {
1,070,425,371✔
1827
    return;
1,057,120,908✔
1828
  }
1829

1830
  switch (pSendInfo->target.type) {
13,304,463✔
1831
    case TARGET_TYPE_MNODE:
984✔
1832
      if (NULL == pTscObj) {
984✔
1833
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1834
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
UNCOV
1835
        return;
×
1836
      }
1837

1838
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
984✔
1839
      SEpSet* pOrig = &originEpset;
984✔
1840
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
984✔
1841
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
984✔
1842
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
984✔
1843
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1844
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
984✔
1845
      break;
8,967,908✔
1846
    case TARGET_TYPE_VNODE: {
13,052,673✔
1847
      if (NULL == pTscObj) {
13,052,936✔
UNCOV
1848
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1849
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1850
        return;
×
1851
      }
1852

1853
      SCatalog* pCatalog = NULL;
13,052,936✔
1854
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
13,052,936✔
1855
      if (code != TSDB_CODE_SUCCESS) {
13,053,192✔
UNCOV
1856
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1857
                 tstrerror(code));
UNCOV
1858
        return;
×
1859
      }
1860

1861
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
13,053,192✔
1862
      if (code != TSDB_CODE_SUCCESS) {
13,054,649✔
UNCOV
1863
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1864
                 tstrerror(code));
1865
        return;
×
1866
      }
1867
      taosMemoryFreeClear(pSendInfo->target.dbFName);
13,054,649✔
1868
      break;
13,054,401✔
1869
    }
1870
    default:
254,338✔
1871
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
254,338✔
1872
      break;
254,022✔
1873
  }
1874
}
1875

1876
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
1,071,037,118✔
1877
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
1,071,037,118✔
1878
  if (pMsg->info.ahandle == NULL) {
1,071,038,012✔
1879
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
608,190✔
1880
    rpcFreeCont(pMsg->pCont);
608,190✔
1881
    taosMemoryFree(pEpSet);
608,190✔
1882
    return TSDB_CODE_TSC_INTERNAL_ERROR;
608,190✔
1883
  }
1884

1885
  STscObj* pTscObj = NULL;
1,070,428,665✔
1886

1887
  STraceId* trace = &pMsg->info.traceId;
1,070,428,665✔
1888
  char      tbuf[40] = {0};
1,070,430,990✔
1889
  TRACE_TO_STR(trace, tbuf);
1,070,430,540✔
1890

1891
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
1,070,433,502✔
1892
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1893

1894
  if (pSendInfo->requestObjRefId != 0) {
1,070,433,221✔
1895
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
918,954,516✔
1896
    if (pRequest) {
918,952,467✔
1897
      if (pRequest->self != pSendInfo->requestObjRefId) {
918,687,116✔
UNCOV
1898
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1899
                 pSendInfo->requestObjRefId);
1900

UNCOV
1901
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
UNCOV
1902
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1903
        }
UNCOV
1904
        rpcFreeCont(pMsg->pCont);
×
UNCOV
1905
        taosMemoryFree(pEpSet);
×
UNCOV
1906
        destroySendMsgInfo(pSendInfo);
×
UNCOV
1907
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1908
      }
1909
      pTscObj = pRequest->pTscObj;
918,687,601✔
1910
    }
1911
  }
1912

1913
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
1,070,428,745✔
1914

1915
  SDataBuf buf = {.msgType = pMsg->msgType,
1,070,428,617✔
1916
                  .len = pMsg->contLen,
1,070,429,382✔
1917
                  .pData = NULL,
1918
                  .handle = pMsg->info.handle,
1,070,431,021✔
1919
                  .handleRefId = pMsg->info.refId,
1,070,430,917✔
1920
                  .pEpSet = pEpSet};
1921

1922
  if (pMsg->contLen > 0) {
1,070,431,527✔
1923
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
1,045,731,481✔
1924
    if (buf.pData == NULL) {
1,045,728,524✔
UNCOV
1925
      pMsg->code = terrno;
×
1926
    } else {
1927
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
1,045,728,524✔
1928
    }
1929
  }
1930

1931
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
1,070,431,209✔
1932

1933
  if (pTscObj) {
1,070,418,482✔
1934
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
918,677,827✔
1935
    if (TSDB_CODE_SUCCESS != code) {
918,686,635✔
UNCOV
1936
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
UNCOV
1937
      terrno = code;
×
UNCOV
1938
      pMsg->code = code;
×
1939
    }
1940
  }
1941

1942
  rpcFreeCont(pMsg->pCont);
1,070,427,290✔
1943
  destroySendMsgInfo(pSendInfo);
1,070,419,938✔
1944
  return TSDB_CODE_SUCCESS;
1,070,407,586✔
1945
}
1946

1947
int32_t doProcessMsgFromServer(void* param) {
1,071,040,623✔
1948
  AsyncArg* arg = (AsyncArg*)param;
1,071,040,623✔
1949
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
1,071,040,623✔
1950
  taosMemoryFree(arg);
1,071,013,608✔
1951
  return code;
1,071,024,119✔
1952
}
1953

1954
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
1,071,028,351✔
1955
  int32_t code = 0;
1,071,028,351✔
1956
  SEpSet* tEpSet = NULL;
1,071,028,351✔
1957

1958
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
1,071,028,351✔
1959

1960
  if (pEpSet != NULL) {
1,071,035,483✔
1961
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
13,308,554✔
1962
    if (NULL == tEpSet) {
13,307,762✔
UNCOV
1963
      code = terrno;
×
UNCOV
1964
      pMsg->code = terrno;
×
UNCOV
1965
      goto _exit;
×
1966
    }
1967
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
13,307,762✔
1968
  }
1969

1970
  // pMsg is response msg
1971
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
1,071,034,691✔
1972
    // restore origin code
1973
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
3,408,988✔
UNCOV
1974
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1975
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
3,408,988✔
UNCOV
1976
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1977
    }
1978
  } else {
1979
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1980
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
1,067,623,758✔
1981
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1982
    }
1983
  }
1984

1985
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
1,071,035,911✔
1986
  if (NULL == arg) {
1,071,028,303✔
UNCOV
1987
    code = terrno;
×
UNCOV
1988
    pMsg->code = code;
×
UNCOV
1989
    goto _exit;
×
1990
  }
1991

1992
  arg->msg = *pMsg;
1,071,028,303✔
1993
  arg->pEpset = tEpSet;
1,071,030,670✔
1994

1995
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
1,071,032,312✔
1996
    pMsg->code = code;
642✔
1997
    taosMemoryFree(arg);
642✔
1998
    goto _exit;
×
1999
  }
2000
  return;
1,071,035,933✔
2001

UNCOV
2002
_exit:
×
UNCOV
2003
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
2004
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
2005
  if (code != 0) {
×
2006
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2007
  }
2008
}
2009

2010

2011

NEW
2012
TAOS *taos_connect_totp(const char *ip, const char *user, const char *pass, const char* totp, const char *db, uint16_t port) {
×
NEW
2013
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
NEW
2014
  if (user == NULL) {
×
NEW
2015
    user = TSDB_DEFAULT_USER;
×
2016
  }
2017

NEW
2018
  if (pass == NULL) {
×
NEW
2019
    pass = TSDB_DEFAULT_PASS;
×
2020
  }
2021

NEW
2022
  STscObj *pObj = NULL;
×
NEW
2023
  int32_t  code = taos_connect_internal(ip, user, pass, NULL, totp, db, port, CONN_TYPE__QUERY, &pObj);
×
NEW
2024
  if (TSDB_CODE_SUCCESS == code) {
×
NEW
2025
    int64_t *rid = taosMemoryCalloc(1, sizeof(int64_t));
×
NEW
2026
    if (NULL == rid) {
×
NEW
2027
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
NEW
2028
      return NULL;
×
2029
    }
NEW
2030
    *rid = pObj->id;
×
NEW
2031
    return (TAOS *)rid;
×
2032
  } else {
NEW
2033
    terrno = code;
×
2034
  }
2035

NEW
2036
  return NULL;
×
2037
}
2038

2039

NEW
2040
int taos_connect_test(const char *ip, const char *user, const char *pass, const char* totp, const char *db, uint16_t port) {
×
NEW
2041
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
NEW
2042
  if (user == NULL) {
×
NEW
2043
    user = TSDB_DEFAULT_USER;
×
2044
  }
2045

NEW
2046
  if (pass == NULL) {
×
NEW
2047
    pass = TSDB_DEFAULT_PASS;
×
2048
  }
2049

NEW
2050
  STscObj *pObj = NULL;
×
NEW
2051
  return taos_connect_internal(ip, user, pass, NULL, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
×
2052
}
2053

2054

NEW
2055
TAOS *taos_connect_token(const char *ip, const char *token, const char *db, uint16_t port) {
×
NEW
2056
  return NULL;
×
2057
}
2058

2059

2060
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
834✔
2061
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
834✔
2062
  if (user == NULL) {
834✔
UNCOV
2063
    user = TSDB_DEFAULT_USER;
×
2064
  }
2065

2066
  if (auth == NULL) {
834✔
UNCOV
2067
    tscError("No auth info is given, failed to connect to server");
×
UNCOV
2068
    return NULL;
×
2069
  }
2070

2071
  STscObj* pObj = NULL;
834✔
2072
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
834✔
2073
  if (TSDB_CODE_SUCCESS == code) {
834✔
2074
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
146✔
2075
    if (NULL == rid) {
146✔
UNCOV
2076
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2077
    }
2078
    *rid = pObj->id;
146✔
2079
    return (TAOS*)rid;
146✔
2080
  }
2081

2082
  return NULL;
688✔
2083
}
2084

2085
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
2086
//                      const char* db, int dbLen, uint16_t port) {
2087
//   char ipStr[TSDB_EP_LEN] = {0};
2088
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
2089
//   char userStr[TSDB_USER_LEN] = {0};
2090
//   char passStr[TSDB_PASSWORD_LEN] = {0};
2091
//
2092
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
2093
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
2094
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
2095
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
2096
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
2097
// }
2098

2099
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2100
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2101
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2102

2103
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2104
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2105

2106
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2107
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2108
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2109

2110
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
UNCOV
2111
          pResultInfo->length[i] = blobDataLen(pStart);
×
UNCOV
2112
          pResultInfo->row[i] = blobDataVal(pStart);
×
2113
        } else {
2114
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2115
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2116
        }
2117
      } else {
2118
        pResultInfo->row[i] = NULL;
73,858,615✔
2119
        pResultInfo->length[i] = 0;
73,929,804✔
2120
      }
2121
    } else {
2122
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2123
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2124
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2125
      } else {
2126
        pResultInfo->row[i] = NULL;
328,321,630✔
2127
        pResultInfo->length[i] = 0;
329,222,507✔
2128
      }
2129
    }
2130
  }
2131
}
2,147,483,647✔
2132

UNCOV
2133
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2134
  if (pRequest == NULL) {
×
2135
    return NULL;
×
2136
  }
2137

UNCOV
2138
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
UNCOV
2139
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2140
    // All data has returned to App already, no need to try again
2141
    if (pResultInfo->completed) {
×
2142
      pResultInfo->numOfRows = 0;
×
2143
      return NULL;
×
2144
    }
2145

2146
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
UNCOV
2147
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2148

UNCOV
2149
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
UNCOV
2150
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
UNCOV
2151
      pResultInfo->numOfRows = 0;
×
UNCOV
2152
      return NULL;
×
2153
    }
2154

UNCOV
2155
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
UNCOV
2156
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
UNCOV
2157
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
UNCOV
2158
      pResultInfo->numOfRows = 0;
×
UNCOV
2159
      return NULL;
×
2160
    }
2161

UNCOV
2162
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2163
             ", complete:%d, QID:0x%" PRIx64,
2164
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2165

UNCOV
2166
    STscObj*            pTscObj = pRequest->pTscObj;
×
UNCOV
2167
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
UNCOV
2168
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2169

UNCOV
2170
    if (pResultInfo->numOfRows == 0) {
×
UNCOV
2171
      return NULL;
×
2172
    }
2173
  }
2174

UNCOV
2175
  if (setupOneRowPtr) {
×
UNCOV
2176
    doSetOneRowPtr(pResultInfo);
×
UNCOV
2177
    pResultInfo->current += 1;
×
2178
  }
2179

UNCOV
2180
  return pResultInfo->row;
×
2181
}
2182

2183
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
101,764,135✔
2184
  tsem_t* sem = param;
101,764,135✔
2185
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
101,764,135✔
UNCOV
2186
    tscError("failed to post sem, code:%s", terrstr());
×
2187
  }
2188
}
101,764,393✔
2189

2190
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,222,465,510✔
2191
  if (pRequest == NULL) {
1,222,465,510✔
UNCOV
2192
    return NULL;
×
2193
  }
2194

2195
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,222,465,510✔
2196
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,222,488,505✔
2197
    // All data has returned to App already, no need to try again
2198
    if (pResultInfo->completed) {
178,051,187✔
2199
      pResultInfo->numOfRows = 0;
76,304,984✔
2200
      return NULL;
76,305,230✔
2201
    }
2202

2203
    // convert ucs4 to native multi-bytes string
2204
    pResultInfo->convertUcs4 = convertUcs4;
101,764,381✔
2205
    tsem_t sem;
91,127,713✔
2206
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
101,764,131✔
UNCOV
2207
      tscError("failed to init sem, code:%s", terrstr());
×
2208
    }
2209
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
101,763,594✔
2210
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
101,764,393✔
UNCOV
2211
      tscError("failed to wait sem, code:%s", terrstr());
×
2212
    }
2213
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
101,764,187✔
2214
      tscError("failed to destroy sem, code:%s", terrstr());
×
2215
    }
2216
    pRequest->inCallback = false;
101,764,393✔
2217
  }
2218

2219
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,146,220,313✔
2220
    return NULL;
7,278,382✔
2221
  } else {
2222
    if (setupOneRowPtr) {
1,138,933,394✔
2223
      doSetOneRowPtr(pResultInfo);
1,046,141,893✔
2224
      pResultInfo->current += 1;
1,046,148,921✔
2225
    }
2226

2227
    return pResultInfo->row;
1,138,938,135✔
2228
  }
2229
}
2230

2231
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
138,819,160✔
2232
  if (pResInfo->row == NULL) {
138,819,160✔
2233
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
118,619,162✔
2234
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
118,619,739✔
2235
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
118,616,544✔
2236
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
118,617,858✔
2237

2238
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
118,618,621✔
2239
      taosMemoryFree(pResInfo->row);
2,508✔
UNCOV
2240
      taosMemoryFree(pResInfo->pCol);
×
UNCOV
2241
      taosMemoryFree(pResInfo->length);
×
2242
      taosMemoryFree(pResInfo->convertBuf);
×
2243
      return terrno;
×
2244
    }
2245
  }
2246

2247
  return TSDB_CODE_SUCCESS;
138,820,743✔
2248
}
2249

2250
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
137,201,443✔
2251
  int32_t idx = -1;
137,201,443✔
2252
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
137,201,655✔
2253
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
137,197,361✔
2254

2255
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
764,517,880✔
2256
    int32_t type = pResultInfo->fields[i].type;
627,326,289✔
2257
    int32_t schemaBytes =
2258
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
627,327,416✔
2259

2260
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
627,323,935✔
2261
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
22,792,691✔
2262
      if (p == NULL) {
22,792,778✔
UNCOV
2263
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
UNCOV
2264
        return terrno;
×
2265
      }
2266

2267
      pResultInfo->convertBuf[i] = p;
22,792,778✔
2268

2269
      SResultColumn* pCol = &pResultInfo->pCol[i];
22,792,778✔
2270
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2271
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2272
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2273

2274
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2275
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2276
            tscError(
331✔
2277
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2278
                "colLength[i]):%p",
2279
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2280
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
331✔
2281
            return TSDB_CODE_TSC_INTERNAL_ERROR;
331✔
2282
          }
2283

2284
          varDataSetLen(p, len);
2,147,483,647✔
2285
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2286
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2287
        }
2288
      }
2289

2290
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
22,792,447✔
2291
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
22,792,447✔
2292
    }
2293
  }
2294
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
137,200,571✔
2295
  return TSDB_CODE_SUCCESS;
137,200,931✔
2296
}
2297

2298
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
137,198,545✔
2299
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
764,512,289✔
2300
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
627,322,555✔
2301
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
627,318,314✔
2302
    int32_t       type = pFieldE->type;
627,317,702✔
2303
    int32_t       bufLen = 0;
627,321,198✔
2304
    char*         p = NULL;
627,321,198✔
2305
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
627,321,198✔
2306
      continue;
625,644,939✔
2307
    } else {
2308
      bufLen = 64;
1,668,864✔
2309
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,668,864✔
2310
      pFieldE->bytes = bufLen;
1,668,864✔
2311
      pField->bytes = bufLen;
1,668,864✔
2312
    }
2313
    if (!p) return terrno;
1,668,864✔
2314
    pResultInfo->convertBuf[i] = p;
1,668,864✔
2315

2316
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,024,974,347✔
2317
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
1,023,305,483✔
2318
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
1,023,305,483✔
2319
      p += bufLen;
1,023,305,483✔
2320
      if (TSDB_CODE_SUCCESS != code) {
1,023,305,483✔
UNCOV
2321
        return code;
×
2322
      }
2323
    }
2324
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,668,864✔
2325
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,668,864✔
2326
  }
2327
  return 0;
137,195,640✔
2328
}
2329

2330
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
391,906✔
2331
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
783,812✔
2332
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
391,906✔
2333
}
2334

2335
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
195,953✔
2336
  char*   p = (char*)pResultInfo->pData;
195,953✔
2337
  int32_t blockVersion = *(int32_t*)p;
195,953✔
2338

2339
  int32_t numOfRows = pResultInfo->numOfRows;
195,953✔
2340
  int32_t numOfCols = pResultInfo->numOfCols;
195,953✔
2341

2342
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2343
  // length |
2344
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
195,953✔
2345
  if (numOfCols != cols) {
195,953✔
UNCOV
2346
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2347
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2348
  }
2349

2350
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
195,953✔
2351
  int32_t* colLength = (int32_t*)(p + len);
195,953✔
2352
  len += sizeof(int32_t) * numOfCols;
195,953✔
2353

2354
  char* pStart = p + len;
195,953✔
2355
  for (int32_t i = 0; i < numOfCols; ++i) {
851,950✔
2356
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
655,997✔
2357

2358
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
655,997✔
2359
      int32_t* offset = (int32_t*)pStart;
232,620✔
2360
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
232,620✔
2361
      len += lenTmp;
232,620✔
2362
      pStart += lenTmp;
232,620✔
2363

2364
      int32_t estimateColLen = 0;
232,620✔
2365
      for (int32_t j = 0; j < numOfRows; ++j) {
1,222,025✔
2366
        if (offset[j] == -1) {
989,405✔
2367
          continue;
49,518✔
2368
        }
2369
        char* data = offset[j] + pStart;
939,887✔
2370

2371
        int32_t jsonInnerType = *data;
939,887✔
2372
        char*   jsonInnerData = data + CHAR_BYTES;
939,887✔
2373
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
939,887✔
2374
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
13,032✔
2375
        } else if (tTagIsJson(data)) {
926,855✔
2376
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
215,698✔
2377
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
711,157✔
2378
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
662,287✔
2379
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,870✔
2380
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
35,838✔
2381
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
13,032✔
2382
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
13,032✔
UNCOV
2383
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
UNCOV
2384
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2385
        } else {
UNCOV
2386
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
UNCOV
2387
          return -1;
×
2388
        }
2389
      }
2390
      len += TMAX(colLen, estimateColLen);
232,620✔
2391
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
423,377✔
2392
      int32_t lenTmp = numOfRows * sizeof(int32_t);
54,300✔
2393
      len += (lenTmp + colLen);
54,300✔
2394
      pStart += lenTmp;
54,300✔
2395
    } else {
2396
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
369,077✔
2397
      len += (lenTmp + colLen);
369,077✔
2398
      pStart += lenTmp;
369,077✔
2399
    }
2400
    pStart += colLen;
655,997✔
2401
  }
2402

2403
  // Ensure the complete structure of the block, including the blankfill field,
2404
  // even though it is not used on the client side.
2405
  len += sizeof(bool);
195,953✔
2406
  return len;
195,953✔
2407
}
2408

2409
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
138,819,221✔
2410
  int32_t numOfRows = pResultInfo->numOfRows;
138,819,221✔
2411
  int32_t numOfCols = pResultInfo->numOfCols;
138,819,785✔
2412
  bool    needConvert = false;
138,822,540✔
2413
  for (int32_t i = 0; i < numOfCols; ++i) {
775,559,986✔
2414
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
636,933,515✔
2415
      needConvert = true;
195,953✔
2416
      break;
195,953✔
2417
    }
2418
  }
2419

2420
  if (!needConvert) {
138,822,424✔
2421
    return TSDB_CODE_SUCCESS;
138,626,471✔
2422
  }
2423

2424
  tscDebug("start to convert form json format string");
195,953✔
2425

2426
  char*   p = (char*)pResultInfo->pData;
195,953✔
2427
  int32_t blockVersion = *(int32_t*)p;
195,953✔
2428
  int32_t dataLen = estimateJsonLen(pResultInfo);
195,953✔
2429
  if (dataLen <= 0) {
195,953✔
UNCOV
2430
    tscError("doConvertJson error: estimateJsonLen failed");
×
UNCOV
2431
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2432
  }
2433

2434
  taosMemoryFreeClear(pResultInfo->convertJson);
195,953✔
2435
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
195,953✔
2436
  if (pResultInfo->convertJson == NULL) return terrno;
195,953✔
2437
  char* p1 = pResultInfo->convertJson;
195,953✔
2438

2439
  int32_t totalLen = 0;
195,953✔
2440
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
195,953✔
2441
  if (numOfCols != cols) {
195,953✔
UNCOV
2442
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2443
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2444
  }
2445

2446
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
195,953✔
2447
  (void)memcpy(p1, p, len);
195,953✔
2448

2449
  p += len;
195,953✔
2450
  p1 += len;
195,953✔
2451
  totalLen += len;
195,953✔
2452

2453
  len = sizeof(int32_t) * numOfCols;
195,953✔
2454
  int32_t* colLength = (int32_t*)p;
195,953✔
2455
  int32_t* colLength1 = (int32_t*)p1;
195,953✔
2456
  (void)memcpy(p1, p, len);
195,953✔
2457
  p += len;
195,953✔
2458
  p1 += len;
195,953✔
2459
  totalLen += len;
195,953✔
2460

2461
  char* pStart = p;
195,953✔
2462
  char* pStart1 = p1;
195,953✔
2463
  for (int32_t i = 0; i < numOfCols; ++i) {
851,950✔
2464
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
655,997✔
2465
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
655,997✔
2466
    if (colLen >= dataLen) {
655,997✔
UNCOV
2467
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
UNCOV
2468
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2469
    }
2470
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
655,997✔
2471
      int32_t* offset = (int32_t*)pStart;
232,620✔
2472
      int32_t* offset1 = (int32_t*)pStart1;
232,620✔
2473
      len = numOfRows * sizeof(int32_t);
232,620✔
2474
      (void)memcpy(pStart1, pStart, len);
232,620✔
2475
      pStart += len;
232,620✔
2476
      pStart1 += len;
232,620✔
2477
      totalLen += len;
232,620✔
2478

2479
      len = 0;
232,620✔
2480
      for (int32_t j = 0; j < numOfRows; ++j) {
1,222,025✔
2481
        if (offset[j] == -1) {
989,405✔
2482
          continue;
49,518✔
2483
        }
2484
        char* data = offset[j] + pStart;
939,887✔
2485

2486
        int32_t jsonInnerType = *data;
939,887✔
2487
        char*   jsonInnerData = data + CHAR_BYTES;
939,887✔
2488
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
939,887✔
2489
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
939,887✔
2490
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
13,032✔
2491
          varDataSetLen(dst, strlen(varDataVal(dst)));
13,032✔
2492
        } else if (tTagIsJson(data)) {
926,855✔
2493
          char* jsonString = NULL;
215,698✔
2494
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
215,698✔
2495
          if (jsonString == NULL) {
215,698✔
UNCOV
2496
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2497
            return terrno;
×
2498
          }
2499
          STR_TO_VARSTR(dst, jsonString);
215,698✔
2500
          taosMemoryFree(jsonString);
215,698✔
2501
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
711,157✔
2502
          *(char*)varDataVal(dst) = '\"';
662,287✔
2503
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
662,287✔
2504
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
662,287✔
2505
                                         varDataVal(tmp), pResultInfo->charsetCxt);
2506
          if (length <= 0) {
662,287✔
2507
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
543✔
2508
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2509
            length = 0;
543✔
2510
          }
2511
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,varDataVal(tmp), length);
662,287✔
2512
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
662,287✔
2513
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
662,287✔
2514
          tscError("value:%s.", varDataVal(dst));
662,287✔
2515
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,870✔
2516
          double jsonVd = *(double*)(jsonInnerData);
35,838✔
2517
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
35,838✔
2518
          varDataSetLen(dst, strlen(varDataVal(dst)));
35,838✔
2519
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
13,032✔
2520
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
13,032✔
2521
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
13,032✔
2522
          varDataSetLen(dst, strlen(varDataVal(dst)));
13,032✔
2523
        } else {
UNCOV
2524
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
UNCOV
2525
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2526
        }
2527

2528
        offset1[j] = len;
939,887✔
2529
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
939,887✔
2530
        len += varDataTLen(dst);
939,887✔
2531
      }
2532
      colLen1 = len;
232,620✔
2533
      totalLen += colLen1;
232,620✔
2534
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
232,620✔
2535
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
423,377✔
2536
      len = numOfRows * sizeof(int32_t);
54,300✔
2537
      (void)memcpy(pStart1, pStart, len);
54,300✔
2538
      pStart += len;
54,300✔
2539
      pStart1 += len;
54,300✔
2540
      totalLen += len;
54,300✔
2541
      totalLen += colLen;
54,300✔
2542
      (void)memcpy(pStart1, pStart, colLen);
54,300✔
2543
    } else {
2544
      len = BitmapLen(pResultInfo->numOfRows);
369,077✔
2545
      (void)memcpy(pStart1, pStart, len);
369,077✔
2546
      pStart += len;
369,077✔
2547
      pStart1 += len;
369,077✔
2548
      totalLen += len;
369,077✔
2549
      totalLen += colLen;
369,077✔
2550
      (void)memcpy(pStart1, pStart, colLen);
369,077✔
2551
    }
2552
    pStart += colLen;
655,997✔
2553
    pStart1 += colLen1;
655,997✔
2554
  }
2555

2556
  // Ensure the complete structure of the block, including the blankfill field,
2557
  // even though it is not used on the client side.
2558
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2559
  totalLen += sizeof(bool);
195,953✔
2560

2561
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
195,953✔
2562
  pResultInfo->pData = pResultInfo->convertJson;
195,953✔
2563
  return TSDB_CODE_SUCCESS;
195,953✔
2564
}
2565

2566
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
146,275,099✔
2567
  bool convertForDecimal = convertUcs4;
146,275,099✔
2568
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
146,275,099✔
2569
    tscError("setResultDataPtr paras error");
604✔
UNCOV
2570
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2571
  }
2572

2573
  if (pResultInfo->numOfRows == 0) {
146,274,311✔
2574
    return TSDB_CODE_SUCCESS;
7,453,684✔
2575
  }
2576

2577
  if (pResultInfo->pData == NULL) {
138,821,704✔
UNCOV
2578
    tscError("setResultDataPtr error: pData is NULL");
×
UNCOV
2579
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2580
  }
2581

2582
  int32_t code = doPrepareResPtr(pResultInfo);
138,819,479✔
2583
  if (code != TSDB_CODE_SUCCESS) {
138,820,285✔
UNCOV
2584
    return code;
×
2585
  }
2586
  code = doConvertJson(pResultInfo);
138,820,285✔
2587
  if (code != TSDB_CODE_SUCCESS) {
138,819,142✔
UNCOV
2588
    return code;
×
2589
  }
2590

2591
  char* p = (char*)pResultInfo->pData;
138,819,142✔
2592

2593
  // version:
2594
  int32_t blockVersion = *(int32_t*)p;
138,819,611✔
2595
  p += sizeof(int32_t);
138,820,772✔
2596

2597
  int32_t dataLen = *(int32_t*)p;
138,820,560✔
2598
  p += sizeof(int32_t);
138,821,090✔
2599

2600
  int32_t rows = *(int32_t*)p;
138,820,498✔
2601
  p += sizeof(int32_t);
138,820,386✔
2602

2603
  int32_t cols = *(int32_t*)p;
138,820,717✔
2604
  p += sizeof(int32_t);
138,820,647✔
2605

2606
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
138,821,461✔
2607
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
842✔
2608
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
UNCOV
2609
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2610
  }
2611

2612
  int32_t hasColumnSeg = *(int32_t*)p;
138,820,279✔
2613
  p += sizeof(int32_t);
138,821,610✔
2614

2615
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
138,821,638✔
2616
  p += sizeof(uint64_t);
138,821,638✔
2617

2618
  // check fields
2619
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
775,804,359✔
2620
    int8_t type = *(int8_t*)p;
636,988,110✔
2621
    p += sizeof(int8_t);
636,984,277✔
2622

2623
    int32_t bytes = *(int32_t*)p;
636,987,928✔
2624
    p += sizeof(int32_t);
636,987,482✔
2625

2626
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
636,984,384✔
2627
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
312,330✔
2628
    }
2629
  }
2630

2631
  int32_t* colLength = (int32_t*)p;
138,821,292✔
2632
  p += sizeof(int32_t) * pResultInfo->numOfCols;
138,821,292✔
2633

2634
  char* pStart = p;
138,821,571✔
2635
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
775,813,566✔
2636
    if ((pStart - pResultInfo->pData) >= dataLen) {
636,991,541✔
2637
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
UNCOV
2638
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2639
    }
2640
    if (blockVersion == BLOCK_VERSION_1) {
636,985,860✔
2641
      colLength[i] = htonl(colLength[i]);
455,634,414✔
2642
    }
2643
    if (colLength[i] >= dataLen) {
636,986,279✔
UNCOV
2644
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2645
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2646
    }
2647
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
636,985,451✔
2648
      tscError("invalid type %d", pResultInfo->fields[i].type);
2,605✔
UNCOV
2649
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2650
    }
2651
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
636,988,813✔
2652
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
156,085,810✔
2653
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
156,086,061✔
2654
    } else {
2655
      pResultInfo->pCol[i].nullbitmap = pStart;
480,907,852✔
2656
      pStart += BitmapLen(pResultInfo->numOfRows);
480,911,129✔
2657
    }
2658

2659
    pResultInfo->pCol[i].pData = pStart;
636,995,645✔
2660
    pResultInfo->length[i] =
1,273,987,754✔
2661
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,206,175,887✔
2662
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
636,993,351✔
2663

2664
    pStart += colLength[i];
636,992,050✔
2665
  }
2666

2667
  p = pStart;
138,823,036✔
2668
  // bool blankFill = *(bool*)p;
2669
  p += sizeof(bool);
138,823,036✔
2670
  int32_t offset = p - pResultInfo->pData;
138,823,192✔
2671
  if (offset > dataLen) {
138,822,188✔
UNCOV
2672
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
UNCOV
2673
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2674
  }
2675

2676
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2677
  if (convertUcs4) {
138,822,188✔
2678
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
137,201,503✔
2679
  }
2680
#endif
2681
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
138,822,160✔
2682
    code = convertDecimalType(pResultInfo);
137,201,356✔
2683
  }
2684
  return code;
138,809,905✔
2685
}
2686

2687
char* getDbOfConnection(STscObj* pObj) {
660,551,812✔
2688
  terrno = TSDB_CODE_SUCCESS;
660,551,812✔
2689
  char* p = NULL;
660,559,608✔
2690
  (void)taosThreadMutexLock(&pObj->mutex);
660,559,608✔
2691
  size_t len = strlen(pObj->db);
660,561,510✔
2692
  if (len > 0) {
660,561,851✔
2693
    p = taosStrndup(pObj->db, tListLen(pObj->db));
590,234,342✔
2694
    if (p == NULL) {
590,229,652✔
UNCOV
2695
      tscError("failed to taosStrndup db name");
×
2696
    }
2697
  }
2698

2699
  (void)taosThreadMutexUnlock(&pObj->mutex);
660,557,161✔
2700
  return p;
660,550,346✔
2701
}
2702

2703
void setConnectionDB(STscObj* pTscObj, const char* db) {
3,092,734✔
2704
  if (db == NULL || pTscObj == NULL) {
3,092,734✔
2705
    tscError("setConnectionDB para is NULL");
×
UNCOV
2706
    return;
×
2707
  }
2708

2709
  (void)taosThreadMutexLock(&pTscObj->mutex);
3,092,734✔
2710
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
3,092,631✔
2711
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
3,092,631✔
2712
}
2713

UNCOV
2714
void resetConnectDB(STscObj* pTscObj) {
×
UNCOV
2715
  if (pTscObj == NULL) {
×
2716
    return;
×
2717
  }
2718

UNCOV
2719
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
UNCOV
2720
  pTscObj->db[0] = 0;
×
UNCOV
2721
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2722
}
2723

2724
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
107,197,095✔
2725
                              bool isStmt) {
2726
  if (pResultInfo == NULL || pRsp == NULL) {
107,197,095✔
2727
    tscError("setQueryResultFromRsp paras is null");
256✔
UNCOV
2728
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2729
  }
2730

2731
  taosMemoryFreeClear(pResultInfo->pRspMsg);
107,196,839✔
2732
  pResultInfo->pRspMsg = (const char*)pRsp;
107,196,321✔
2733
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
107,197,095✔
2734
  pResultInfo->current = 0;
107,197,095✔
2735
  pResultInfo->completed = (pRsp->completed == 1);
107,197,095✔
2736
  pResultInfo->precision = pRsp->precision;
107,196,581✔
2737

2738
  // decompress data if needed
2739
  int32_t payloadLen = htonl(pRsp->payloadLen);
107,196,581✔
2740

2741
  if (pRsp->compressed) {
107,196,335✔
UNCOV
2742
    if (pResultInfo->decompBuf == NULL) {
×
UNCOV
2743
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
UNCOV
2744
      if (pResultInfo->decompBuf == NULL) {
×
UNCOV
2745
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2746
        return terrno;
×
2747
      }
UNCOV
2748
      pResultInfo->decompBufSize = payloadLen;
×
2749
    } else {
UNCOV
2750
      if (pResultInfo->decompBufSize < payloadLen) {
×
UNCOV
2751
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
UNCOV
2752
        if (p == NULL) {
×
UNCOV
2753
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2754
          return terrno;
×
2755
        }
2756

UNCOV
2757
        pResultInfo->decompBuf = p;
×
UNCOV
2758
        pResultInfo->decompBufSize = payloadLen;
×
2759
      }
2760
    }
2761
  }
2762

2763
  if (payloadLen > 0) {
107,196,323✔
2764
    int32_t compLen = *(int32_t*)pRsp->data;
99,743,565✔
2765
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
99,743,565✔
2766

2767
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
99,743,823✔
2768

2769
    if (pRsp->compressed && compLen < rawLen) {
99,743,307✔
UNCOV
2770
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
UNCOV
2771
      if (len < 0) {
×
2772
        tscError("tsDecompressString failed");
×
2773
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2774
      }
UNCOV
2775
      if (len != rawLen) {
×
UNCOV
2776
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
UNCOV
2777
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2778
      }
UNCOV
2779
      pResultInfo->pData = pResultInfo->decompBuf;
×
UNCOV
2780
      pResultInfo->payloadLen = rawLen;
×
2781
    } else {
2782
      pResultInfo->pData = pStart;
99,743,309✔
2783
      pResultInfo->payloadLen = htonl(pRsp->compLen);
99,743,823✔
2784
      if (pRsp->compLen != pRsp->payloadLen) {
99,743,823✔
UNCOV
2785
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2786
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2787
      }
2788
    }
2789
  }
2790

2791
  // TODO handle the compressed case
2792
  pResultInfo->totalRows += pResultInfo->numOfRows;
107,196,065✔
2793

2794
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
107,197,095✔
2795
  return code;
107,193,206✔
2796
}
2797

2798
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
989✔
2799
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
989✔
2800
  void*              clientRpc = NULL;
989✔
2801
  SServerStatusRsp   statusRsp = {0};
989✔
2802
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
989✔
2803
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
989✔
2804
  SRpcMsg  rpcRsp = {0};
989✔
2805
  SRpcInit rpcInit = {0};
989✔
2806
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
989✔
2807

2808
  rpcInit.label = "CHK";
989✔
2809
  rpcInit.numOfThreads = 1;
989✔
2810
  rpcInit.cfp = NULL;
989✔
2811
  rpcInit.sessions = 16;
989✔
2812
  rpcInit.connType = TAOS_CONN_CLIENT;
989✔
2813
  rpcInit.idleTime = tsShellActivityTimer * 1000;
989✔
2814
  rpcInit.compressSize = tsCompressMsgSize;
989✔
2815
  rpcInit.user = "_dnd";
989✔
2816

2817
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
989✔
2818
  connLimitNum = TMAX(connLimitNum, 10);
989✔
2819
  connLimitNum = TMIN(connLimitNum, 500);
989✔
2820
  rpcInit.connLimitNum = connLimitNum;
989✔
2821
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
989✔
2822
  rpcInit.readTimeout = tsReadTimeout;
989✔
2823
  rpcInit.ipv6 = tsEnableIpv6;
989✔
2824
  rpcInit.enableSSL = tsEnableTLS;
989✔
2825

2826
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
989✔
2827
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
989✔
2828
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
989✔
2829
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
989✔
2830
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
989✔
2831

2832
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
989✔
UNCOV
2833
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
UNCOV
2834
    goto _OVER;
×
2835
  }
2836

2837
  clientRpc = rpcOpen(&rpcInit);
989✔
2838
  if (clientRpc == NULL) {
989✔
2839
    code = terrno;
×
2840
    tscError("failed to init server status client since %s", tstrerror(code));
×
UNCOV
2841
    goto _OVER;
×
2842
  }
2843

2844
  if (fqdn == NULL) {
989✔
2845
    fqdn = tsLocalFqdn;
989✔
2846
  }
2847

2848
  if (port == 0) {
989✔
2849
    port = tsServerPort;
989✔
2850
  }
2851

2852
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
989✔
2853
  epSet.eps[0].port = (uint16_t)port;
989✔
2854
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
989✔
2855
  if (TSDB_CODE_SUCCESS != ret) {
989✔
UNCOV
2856
    tscError("failed to send recv since %s", tstrerror(ret));
×
UNCOV
2857
    goto _OVER;
×
2858
  }
2859

2860
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
989✔
2861
    tscError("failed to send server status req since %s", terrstr());
146✔
2862
    goto _OVER;
146✔
2863
  }
2864

2865
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
843✔
UNCOV
2866
    tscError("failed to parse server status rsp since %s", terrstr());
×
UNCOV
2867
    goto _OVER;
×
2868
  }
2869

2870
  code = statusRsp.statusCode;
843✔
2871
  if (details != NULL) {
843✔
2872
    tstrncpy(details, statusRsp.details, maxlen);
843✔
2873
  }
2874

2875
_OVER:
723✔
2876
  if (clientRpc != NULL) {
989✔
2877
    rpcClose(clientRpc);
989✔
2878
  }
2879
  if (rpcRsp.pCont != NULL) {
989✔
2880
    rpcFreeCont(rpcRsp.pCont);
843✔
2881
  }
2882
  return code;
989✔
2883
}
2884

2885
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,276✔
2886
                      int32_t acctId, char* db) {
2887
  SName name = {0};
1,276✔
2888

2889
  if (len1 <= 0) {
1,276✔
UNCOV
2890
    return -1;
×
2891
  }
2892

2893
  const char* dbName = db;
1,276✔
2894
  const char* tbName = NULL;
1,276✔
2895
  int32_t     dbLen = 0;
1,276✔
2896
  int32_t     tbLen = 0;
1,276✔
2897
  if (len2 > 0) {
1,276✔
UNCOV
2898
    dbName = str + pos1;
×
UNCOV
2899
    dbLen = len1;
×
2900
    tbName = str + pos2;
×
2901
    tbLen = len2;
×
2902
  } else {
2903
    dbLen = strlen(db);
1,276✔
2904
    tbName = str + pos1;
1,276✔
2905
    tbLen = len1;
1,276✔
2906
  }
2907

2908
  if (dbLen <= 0 || tbLen <= 0) {
1,276✔
UNCOV
2909
    return -1;
×
2910
  }
2911

2912
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,276✔
UNCOV
2913
    return -1;
×
2914
  }
2915

2916
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,276✔
UNCOV
2917
    return -1;
×
2918
  }
2919

2920
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,276✔
2921
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,276✔
2922

2923
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,276✔
2924
  if (pDb) {
1,276✔
UNCOV
2925
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
UNCOV
2926
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2927
    }
2928
  } else {
2929
    STablesReq db;
1,276✔
2930
    db.pTables = taosArrayInit(20, sizeof(SName));
1,276✔
2931
    if (NULL == db.pTables) {
1,276✔
UNCOV
2932
      return terrno;
×
2933
    }
2934
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,276✔
2935
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,552✔
UNCOV
2936
      return terrno;
×
2937
    }
2938
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,276✔
2939
  }
2940

2941
  return TSDB_CODE_SUCCESS;
1,276✔
2942
}
2943

2944
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,276✔
2945
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,276✔
2946
  if (NULL == pHash) {
1,276✔
UNCOV
2947
    return terrno;
×
2948
  }
2949

2950
  bool    inEscape = false;
1,276✔
2951
  int32_t code = 0;
1,276✔
2952
  void*   pIter = NULL;
1,276✔
2953

2954
  int32_t vIdx = 0;
1,276✔
2955
  int32_t vPos[2];
1,276✔
2956
  int32_t vLen[2];
1,276✔
2957

2958
  (void)memset(vPos, -1, sizeof(vPos));
1,276✔
2959
  (void)memset(vLen, 0, sizeof(vLen));
1,276✔
2960

2961
  for (int32_t i = 0;; ++i) {
6,380✔
2962
    if (0 == *(tbList + i)) {
6,380✔
2963
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,276✔
2964
        vLen[vIdx] = i - vPos[vIdx];
1,276✔
2965
      }
2966

2967
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,276✔
2968
      if (code) {
1,276✔
UNCOV
2969
        goto _return;
×
2970
      }
2971

2972
      break;
1,276✔
2973
    }
2974

2975
    if ('`' == *(tbList + i)) {
5,104✔
2976
      inEscape = !inEscape;
×
UNCOV
2977
      if (!inEscape) {
×
UNCOV
2978
        if (vPos[vIdx] >= 0) {
×
UNCOV
2979
          vLen[vIdx] = i - vPos[vIdx];
×
2980
        } else {
UNCOV
2981
          goto _return;
×
2982
        }
2983
      }
2984

UNCOV
2985
      continue;
×
2986
    }
2987

2988
    if (inEscape) {
5,104✔
UNCOV
2989
      if (vPos[vIdx] < 0) {
×
UNCOV
2990
        vPos[vIdx] = i;
×
2991
      }
2992
      continue;
×
2993
    }
2994

2995
    if ('.' == *(tbList + i)) {
5,104✔
UNCOV
2996
      if (vPos[vIdx] < 0) {
×
UNCOV
2997
        goto _return;
×
2998
      }
2999
      if (vLen[vIdx] <= 0) {
×
UNCOV
3000
        vLen[vIdx] = i - vPos[vIdx];
×
3001
      }
UNCOV
3002
      vIdx++;
×
3003
      if (vIdx >= 2) {
×
UNCOV
3004
        goto _return;
×
3005
      }
UNCOV
3006
      continue;
×
3007
    }
3008

3009
    if (',' == *(tbList + i)) {
5,104✔
UNCOV
3010
      if (vPos[vIdx] < 0) {
×
UNCOV
3011
        goto _return;
×
3012
      }
UNCOV
3013
      if (vLen[vIdx] <= 0) {
×
3014
        vLen[vIdx] = i - vPos[vIdx];
×
3015
      }
3016

UNCOV
3017
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
UNCOV
3018
      if (code) {
×
UNCOV
3019
        goto _return;
×
3020
      }
3021

UNCOV
3022
      (void)memset(vPos, -1, sizeof(vPos));
×
UNCOV
3023
      (void)memset(vLen, 0, sizeof(vLen));
×
UNCOV
3024
      vIdx = 0;
×
UNCOV
3025
      continue;
×
3026
    }
3027

3028
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
5,104✔
UNCOV
3029
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
UNCOV
3030
        vLen[vIdx] = i - vPos[vIdx];
×
3031
      }
UNCOV
3032
      continue;
×
3033
    }
3034

3035
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
5,104✔
3036
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
638✔
3037
      if (vLen[vIdx] > 0) {
5,104✔
UNCOV
3038
        goto _return;
×
3039
      }
3040
      if (vPos[vIdx] < 0) {
5,104✔
3041
        vPos[vIdx] = i;
1,276✔
3042
      }
3043
      continue;
5,104✔
3044
    }
3045

3046
    goto _return;
×
3047
  }
3048

3049
  int32_t dbNum = taosHashGetSize(pHash);
1,276✔
3050
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,276✔
3051
  if (NULL == pReq) {
1,276✔
3052
    TSC_ERR_JRET(terrno);
×
3053
  }
3054
  pIter = taosHashIterate(pHash, NULL);
1,276✔
3055
  while (pIter) {
2,552✔
3056
    STablesReq* pDb = (STablesReq*)pIter;
1,276✔
3057
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,552✔
UNCOV
3058
      TSC_ERR_JRET(terrno);
×
3059
    }
3060
    pIter = taosHashIterate(pHash, pIter);
1,276✔
3061
  }
3062

3063
  taosHashCleanup(pHash);
1,276✔
3064

3065
  return TSDB_CODE_SUCCESS;
1,276✔
3066

3067
_return:
×
3068

3069
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3070

3071
  pIter = taosHashIterate(pHash, NULL);
×
UNCOV
3072
  while (pIter) {
×
3073
    STablesReq* pDb = (STablesReq*)pIter;
×
UNCOV
3074
    taosArrayDestroy(pDb->pTables);
×
UNCOV
3075
    pIter = taosHashIterate(pHash, pIter);
×
3076
  }
3077

3078
  taosHashCleanup(pHash);
×
3079

3080
  return terrno;
×
3081
}
3082

3083
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,276✔
3084
  SSyncQueryParam* pParam = param;
1,276✔
3085
  pParam->pRequest->code = code;
1,276✔
3086

3087
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,276✔
UNCOV
3088
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3089
  }
3090
}
1,276✔
3091

3092
void syncQueryFn(void* param, void* res, int32_t code) {
651,249,347✔
3093
  SSyncQueryParam* pParam = param;
651,249,347✔
3094
  pParam->pRequest = res;
651,249,347✔
3095

3096
  if (pParam->pRequest) {
651,252,311✔
3097
    pParam->pRequest->code = code;
651,234,842✔
3098
    clientOperateReport(pParam->pRequest);
651,240,817✔
3099
  }
3100

3101
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
651,245,064✔
UNCOV
3102
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3103
  }
3104
}
651,254,240✔
3105

3106
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
650,762,106✔
3107
                        int8_t source) {
3108
  if (sql == NULL || NULL == fp) {
650,762,106✔
3109
    terrno = TSDB_CODE_INVALID_PARA;
258✔
UNCOV
3110
    if (fp) {
×
UNCOV
3111
      fp(param, NULL, terrno);
×
3112
    }
3113

UNCOV
3114
    return;
×
3115
  }
3116

3117
  size_t sqlLen = strlen(sql);
650,762,583✔
3118
  if (sqlLen > (size_t)tsMaxSQLLength) {
650,762,583✔
3119
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
1,296✔
3120
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1,296✔
3121
    fp(param, NULL, terrno);
1,296✔
3122
    return;
1,296✔
3123
  }
3124

3125
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
650,761,287✔
3126

3127
  SRequestObj* pRequest = NULL;
650,761,287✔
3128
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
650,764,795✔
3129
  if (code != TSDB_CODE_SUCCESS) {
650,761,705✔
UNCOV
3130
    terrno = code;
×
UNCOV
3131
    fp(param, NULL, terrno);
×
UNCOV
3132
    return;
×
3133
  }
3134

3135
  pRequest->source = source;
650,761,705✔
3136
  pRequest->body.queryFp = fp;
650,766,559✔
3137
  doAsyncQuery(pRequest, false);
650,766,746✔
3138
}
3139

3140
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
7,481✔
3141
                                 int64_t reqid) {
3142
  if (sql == NULL || NULL == fp) {
7,481✔
UNCOV
3143
    terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
3144
    if (fp) {
×
3145
      fp(param, NULL, terrno);
×
3146
    }
3147

UNCOV
3148
    return;
×
3149
  }
3150

3151
  size_t sqlLen = strlen(sql);
7,481✔
3152
  if (sqlLen > (size_t)tsMaxSQLLength) {
7,481✔
UNCOV
3153
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
UNCOV
3154
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3155
    fp(param, NULL, terrno);
×
UNCOV
3156
    return;
×
3157
  }
3158

3159
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
7,481✔
3160

3161
  SRequestObj* pRequest = NULL;
7,481✔
3162
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
7,481✔
3163
  if (code != TSDB_CODE_SUCCESS) {
7,481✔
UNCOV
3164
    terrno = code;
×
UNCOV
3165
    fp(param, NULL, terrno);
×
UNCOV
3166
    return;
×
3167
  }
3168

3169
  pRequest->body.queryFp = fp;
7,481✔
3170
  doAsyncQuery(pRequest, false);
7,481✔
3171
}
3172

3173
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
650,646,873✔
3174
  if (NULL == taos) {
650,646,873✔
UNCOV
3175
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3176
    return NULL;
×
3177
  }
3178

3179
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
650,646,873✔
3180
  if (NULL == param) {
650,652,011✔
3181
    return NULL;
×
3182
  }
3183
  int32_t code = tsem_init(&param->sem, 0, 0);
650,652,011✔
3184
  if (TSDB_CODE_SUCCESS != code) {
650,649,221✔
UNCOV
3185
    taosMemoryFree(param);
×
UNCOV
3186
    return NULL;
×
3187
  }
3188

3189
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
650,649,221✔
3190
  code = tsem_wait(&param->sem);
650,638,850✔
3191
  if (TSDB_CODE_SUCCESS != code) {
650,656,234✔
UNCOV
3192
    taosMemoryFree(param);
×
UNCOV
3193
    return NULL;
×
3194
  }
3195
  code = tsem_destroy(&param->sem);
650,656,234✔
3196
  if (TSDB_CODE_SUCCESS != code) {
650,660,759✔
3197
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3198
  }
3199

3200
  SRequestObj* pRequest = NULL;
650,660,619✔
3201
  if (param->pRequest != NULL) {
650,660,619✔
3202
    param->pRequest->syncQuery = true;
650,657,451✔
3203
    pRequest = param->pRequest;
650,658,199✔
3204
    param->pRequest->inCallback = false;
650,657,858✔
3205
  }
3206
  taosMemoryFree(param);
650,658,835✔
3207

3208
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3209
  //          *(int64_t*)taos, pRequest);
3210

3211
  return pRequest;
650,655,336✔
3212
}
3213

3214
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
7,481✔
3215
  if (NULL == taos) {
7,481✔
UNCOV
3216
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3217
    return NULL;
×
3218
  }
3219

3220
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
7,481✔
3221
  if (param == NULL) {
7,481✔
3222
    return NULL;
×
3223
  }
3224
  int32_t code = tsem_init(&param->sem, 0, 0);
7,481✔
3225
  if (TSDB_CODE_SUCCESS != code) {
7,481✔
UNCOV
3226
    taosMemoryFree(param);
×
UNCOV
3227
    return NULL;
×
3228
  }
3229

3230
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
7,481✔
3231
  code = tsem_wait(&param->sem);
7,481✔
3232
  if (TSDB_CODE_SUCCESS != code) {
7,481✔
3233
    taosMemoryFree(param);
×
UNCOV
3234
    return NULL;
×
3235
  }
3236
  SRequestObj* pRequest = NULL;
7,481✔
3237
  if (param->pRequest != NULL) {
7,481✔
3238
    param->pRequest->syncQuery = true;
7,481✔
3239
    pRequest = param->pRequest;
7,481✔
3240
  }
3241
  taosMemoryFree(param);
7,481✔
3242

3243
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3244
  //   *(int64_t*)taos, pRequest);
3245

3246
  return pRequest;
7,481✔
3247
}
3248

3249
static void fetchCallback(void* pResult, void* param, int32_t code) {
104,057,513✔
3250
  SRequestObj* pRequest = (SRequestObj*)param;
104,057,513✔
3251

3252
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
104,057,513✔
3253

3254
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
104,057,513✔
3255
           tstrerror(code), pRequest->requestId);
3256

3257
  pResultInfo->pData = pResult;
104,057,513✔
3258
  pResultInfo->numOfRows = 0;
104,057,513✔
3259

3260
  if (code != TSDB_CODE_SUCCESS) {
104,056,493✔
UNCOV
3261
    pRequest->code = code;
×
UNCOV
3262
    taosMemoryFreeClear(pResultInfo->pData);
×
UNCOV
3263
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3264
    return;
×
3265
  }
3266

3267
  if (pRequest->code != TSDB_CODE_SUCCESS) {
104,056,493✔
UNCOV
3268
    taosMemoryFreeClear(pResultInfo->pData);
×
UNCOV
3269
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
UNCOV
3270
    return;
×
3271
  }
3272

3273
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
114,710,454✔
3274
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
104,055,733✔
3275
  if (pRequest->code != TSDB_CODE_SUCCESS) {
104,054,778✔
3276
    pResultInfo->numOfRows = 0;
331✔
3277
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
331✔
3278
             tstrerror(pRequest->code), pRequest->requestId);
3279
  } else {
3280
    tscDebug(
104,054,408✔
3281
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3282
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3283

3284
    STscObj*            pTscObj = pRequest->pTscObj;
104,054,900✔
3285
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
104,057,182✔
3286
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
104,056,924✔
3287
  }
3288

3289
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
104,057,255✔
3290
}
3291

3292
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
107,734,787✔
3293
  pRequest->body.fetchFp = fp;
107,734,787✔
3294
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
107,734,787✔
3295

3296
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
107,735,045✔
3297

3298
  // this query has no results or error exists, return directly
3299
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
107,734,787✔
3300
    pResultInfo->numOfRows = 0;
×
3301
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3302
    return;
4,351✔
3303
  }
3304

3305
  // all data has returned to App already, no need to try again
3306
  if (pResultInfo->completed) {
107,735,045✔
3307
    // it is a local executed query, no need to do async fetch
3308
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
3,677,532✔
3309
      if (pResultInfo->localResultFetched) {
1,585,602✔
3310
        pResultInfo->numOfRows = 0;
792,801✔
3311
        pResultInfo->current = 0;
792,801✔
3312
      } else {
3313
        pResultInfo->localResultFetched = true;
792,801✔
3314
      }
3315
    } else {
3316
      pResultInfo->numOfRows = 0;
2,091,930✔
3317
    }
3318

3319
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
3,677,532✔
3320
    return;
3,677,532✔
3321
  }
3322

3323
  SSchedulerReq req = {
104,056,997✔
3324
      .syncReq = false,
3325
      .fetchFp = fetchCallback,
3326
      .cbParam = pRequest,
3327
  };
3328

3329
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
104,056,814✔
3330
  if (TSDB_CODE_SUCCESS != code) {
104,057,009✔
3331
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3332
    // pRequest->body.fetchFp(param, pRequest, code);
3333
  }
3334
}
3335

3336
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
651,279,274✔
3337
  pRequest->inCallback = true;
651,279,274✔
3338
  int64_t this = pRequest->self;
651,285,060✔
3339
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
651,267,852✔
3340
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
86,850✔
UNCOV
3341
    code = TSDB_CODE_SUCCESS;
×
UNCOV
3342
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3343
  }
3344

3345
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
651,267,852✔
3346
           pRequest);
3347

3348
  if (pRequest->body.queryFp != NULL) {
651,268,241✔
3349
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
651,278,719✔
3350
  }
3351

3352
  SRequestObj* pReq = acquireRequest(this);
651,289,481✔
3353
  if (pReq != NULL) {
651,286,447✔
3354
    pReq->inCallback = false;
650,298,373✔
3355
    (void)releaseRequest(this);
650,299,690✔
3356
  }
3357
}
651,286,195✔
3358

3359
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
586,303✔
3360
                       SParseSqlRes* pRes) {
3361
#ifndef TD_ENTERPRISE
3362
  return TSDB_CODE_SUCCESS;
3363
#else
3364
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
586,303✔
3365
#endif
3366
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc