• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.96
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "command.h"
22
#include "decimal.h"
23
#include "scheduler.h"
24
#include "tdatablock.h"
25
#include "tdataformat.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tmisce.h"
29
#include "tmsg.h"
30
#include "tmsgtype.h"
31
#include "tpagedbuf.h"
32
#include "tref.h"
33
#include "tsched.h"
34
#include "tversion.h"
35

36
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
37
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
38

39
void setQueryRequest(int64_t rId) {
709,133,978✔
40
  SRequestObj* pReq = acquireRequest(rId);
709,133,978✔
41
  if (pReq != NULL) {
709,163,285✔
42
    pReq->isQuery = true;
709,140,232✔
43
    (void)releaseRequest(rId);
709,139,564✔
44
  }
45
}
709,154,298✔
46

47
static bool stringLengthCheck(const char* str, size_t maxsize) {
189,491,921✔
48
  if (str == NULL) {
189,491,921✔
49
    return false;
×
50
  }
51

52
  size_t len = strlen(str);
189,491,921✔
53
  if (len <= 0 || len > maxsize) {
189,491,921✔
54
    return false;
×
55
  }
56

57
  return true;
189,492,392✔
58
}
59

60
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
94,463,104✔
61

62
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
94,460,510✔
63

64
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
567,328✔
65

66
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
94,462,664✔
67
  char key[512] = {0};
94,462,664✔
68
  if (user == NULL) {
94,462,664✔
69
    (void)snprintf(key, sizeof(key), "%s:%s:%d", auth, ip, port);
2,390✔
70
  } else {
71
    (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
94,460,274✔
72
  }
73
  return taosStrdup(key);
94,462,664✔
74
}
75

76
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
595,816✔
77
  if (dst == NULL || src == NULL || srcLength == 0) {
595,816✔
78
    return 0;
500✔
79
  }
80

81
  size_t escapeLength = 0;
595,316✔
82
  for (size_t i = 0; i < srcLength; ++i) {
16,879,384✔
83
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
16,284,068✔
84
        src[i] == '\t') {
16,284,068✔
85
      escapeLength += 1;
×
86
    }
87
  }
88

89
  size_t dstLength = srcLength;
595,316✔
90
  if (escapeLength == 0) {
595,316✔
91
    (void)memcpy(dst, src, srcLength);
595,316✔
92
  } else {
93
    dstLength = 0;
×
94
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
95
      switch (src[i]) {
×
96
        case '\"':
×
97
          dst[dstLength++] = '\\';
×
98
          dst[dstLength++] = '\"';
×
99
          break;
×
100
        case '\\':
×
101
          dst[dstLength++] = '\\';
×
102
          dst[dstLength++] = '\\';
×
103
          break;
×
104
        case '\b':
×
105
          dst[dstLength++] = '\\';
×
106
          dst[dstLength++] = 'b';
×
107
          break;
×
108
        case '\f':
×
109
          dst[dstLength++] = '\\';
×
110
          dst[dstLength++] = 'f';
×
111
          break;
×
112
        case '\n':
×
113
          dst[dstLength++] = '\\';
×
114
          dst[dstLength++] = 'n';
×
115
          break;
×
116
        case '\r':
×
117
          dst[dstLength++] = '\\';
×
118
          dst[dstLength++] = 'r';
×
119
          break;
×
120
        case '\t':
×
121
          dst[dstLength++] = '\\';
×
122
          dst[dstLength++] = 't';
×
123
          break;
×
124
        default:
×
125
          dst[dstLength++] = src[i];
×
126
      }
127
    }
128
  }
129

130
  return dstLength;
595,316✔
131
}
132

133
bool chkRequestKilled(void* param) {
2,147,483,647✔
134
  bool         killed = false;
2,147,483,647✔
135
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
136
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
137
    killed = true;
1,520✔
138
  }
139

140
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
141

142
  return killed;
2,147,483,647✔
143
}
144

145
void cleanupAppInfo() {
1,556,853✔
146
  taosHashCleanup(appInfo.pInstMap);
1,556,853✔
147
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,556,853✔
148
  tscInfo("cluster instance map cleaned");
1,556,853✔
149
}
1,556,853✔
150

151
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
152
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
153
                               STscObj** pTscObj);
154

155
int32_t taos_connect_by_auth(const char* ip, const char* user, const char* auth, const char* totp, const char* db,
94,466,273✔
156
                             uint16_t port, int connType, STscObj** pObj) {
157
  TSC_ERR_RET(taos_init());
94,466,273✔
158

159
  if (user == NULL) {
94,465,468✔
160
    if (auth == NULL || strlen(auth) != (TSDB_TOKEN_LEN - 1)) {
3,190✔
161
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOKEN);
800✔
162
    }
163
  } else if (!validateUserName(user)) {
94,462,278✔
164
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
165
  }
166
  int32_t code = 0;
94,465,641✔
167

168
  char localDb[TSDB_DB_NAME_LEN] = {0};
94,465,641✔
169
  if (db != NULL && strlen(db) > 0) {
94,465,641✔
170
    if (!validateDbName(db)) {
567,312✔
171
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
172
    }
173

174
    tstrncpy(localDb, db, sizeof(localDb));
567,315✔
175
    (void)strdequote(localDb);
567,286✔
176
  }
177

178
  int32_t totpCode = -1;
94,464,659✔
179
  if (totp != NULL) {
94,464,659✔
180
    char* endptr = NULL;
2,137✔
181
    totpCode = taosStr2Int32(totp, &endptr, 10);
2,137✔
182
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
2,137✔
183
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
184
    }
185
  }
186

187
  SCorEpSet epSet = {0};
94,464,659✔
188
  if (ip) {
94,464,686✔
189
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
92,379,856✔
190
  } else {
191
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
2,084,830✔
192
  }
193

194
  if (port) {
94,462,212✔
195
    epSet.epSet.eps[0].port = port;
91,549,973✔
196
    epSet.epSet.eps[1].port = port;
91,549,973✔
197
  }
198

199
  char* key = getClusterKey(user, auth, ip, port);
94,462,212✔
200
  if (NULL == key) {
94,464,390✔
201
    TSC_ERR_RET(terrno);
×
202
  }
203
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
94,464,390✔
204
          user ? user : "", db, key);
205
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
191,015,670✔
206
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
96,548,078✔
207
  }
208

209
  SAppInstInfo** pInst = NULL;
94,467,592✔
210
  code = taosThreadMutexLock(&appInfo.mutex);
94,467,592✔
211
  if (TSDB_CODE_SUCCESS != code) {
94,465,673✔
212
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
213
    TSC_ERR_RET(code);
×
214
  }
215

216
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
94,465,673✔
217
  SAppInstInfo* p = NULL;
94,465,673✔
218
  if (pInst == NULL) {
94,465,673✔
219
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,647,548✔
220
    if (NULL == p) {
1,647,548✔
221
      TSC_ERR_JRET(terrno);
×
222
    }
223
    p->mgmtEp = epSet;
1,647,548✔
224
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,647,548✔
225
    if (TSDB_CODE_SUCCESS != code) {
1,647,548✔
226
      taosMemoryFree(p);
×
227
      TSC_ERR_JRET(code);
×
228
    }
229
    code = openTransporter(user, auth, tsNumOfCores / 2, &p->pTransporter);
1,647,548✔
230
    if (TSDB_CODE_SUCCESS != code) {
1,647,548✔
231
      taosMemoryFree(p);
×
232
      TSC_ERR_JRET(code);
×
233
    }
234
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,647,548✔
235
    if (TSDB_CODE_SUCCESS != code) {
1,647,548✔
236
      destroyAppInst(&p);
×
237
      TSC_ERR_JRET(code);
×
238
    }
239
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,647,548✔
240
    if (TSDB_CODE_SUCCESS != code) {
1,647,548✔
241
      destroyAppInst(&p);
×
242
      TSC_ERR_JRET(code);
×
243
    }
244
    p->instKey = key;
1,647,548✔
245
    key = NULL;
1,647,548✔
246
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user ? user : "", epSet.epSet.eps[0].fqdn,
1,647,548✔
247
            epSet.epSet.eps[0].port);
248

249
    pInst = &p;
1,647,548✔
250
  } else {
251
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
92,818,125✔
252
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
253
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
254
    }
255
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
256
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
92,818,125✔
257
  }
258

259
_return:
94,465,673✔
260

261
  if (TSDB_CODE_SUCCESS != code) {
94,465,673✔
262
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
263
    taosMemoryFreeClear(key);
×
264
    return code;
×
265
  } else {
266
    code = taosThreadMutexUnlock(&appInfo.mutex);
94,465,673✔
267
    taosMemoryFreeClear(key);
94,464,882✔
268
    if (TSDB_CODE_SUCCESS != code) {
94,465,673✔
269
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
270
      return code;
×
271
    }
272
    return taosConnectImpl(user, auth, totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
94,465,673✔
273
  }
274
}
275

276
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
94,461,807✔
277
                              uint16_t port, int connType, STscObj** pObj) {
278
  char auth[TSDB_PASSWORD_LEN + 1] = {0};
94,461,807✔
279
  if (!validatePassword(pass)) {
94,462,029✔
280
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
281
  }
282

283
  taosEncryptPass_c((uint8_t*)pass, strlen(pass), auth);
94,462,778✔
284
  return taos_connect_by_auth(ip, user, auth, totp, db, port, connType, pObj);
94,460,602✔
285
}
286

287
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
288
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
289
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
290
//     return *ppAppInstInfo;
291
//   } else {
292
//     return NULL;
293
//   }
294
// }
295

296
void freeQueryParam(SSyncQueryParam* param) {
535,876✔
297
  if (param == NULL) return;
535,876✔
298
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
535,876✔
299
    tscError("failed to destroy semaphore in freeQueryParam");
×
300
  }
301
  taosMemoryFree(param);
535,876✔
302
}
303

304
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
1,092,061,010✔
305
                     SRequestObj** pRequest, int64_t reqid) {
306
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
1,092,061,010✔
307
  if (TSDB_CODE_SUCCESS != code) {
1,092,075,929✔
308
    tscError("failed to malloc sqlObj, %s", sql);
1,156✔
309
    return code;
1,156✔
310
  }
311

312
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
1,092,074,773✔
313
  if ((*pRequest)->sqlstr == NULL) {
1,092,066,719✔
314
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
315
    destroyRequest(*pRequest);
×
316
    *pRequest = NULL;
×
317
    return terrno;
×
318
  }
319

320
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
1,092,064,434✔
321
  (*pRequest)->sqlstr[sqlLen] = 0;
1,092,110,131✔
322
  (*pRequest)->sqlLen = sqlLen;
1,092,111,070✔
323
  (*pRequest)->validateOnly = validateSql;
1,092,110,206✔
324
  (*pRequest)->stmtBindVersion = 0;
1,092,106,470✔
325

326
  code = sqlSecurityCheckStringLevel(*pRequest, (*pRequest)->sqlstr, (*pRequest)->sqlLen);
1,092,096,822✔
327
  if (code != TSDB_CODE_SUCCESS) {
1,092,059,262✔
328
    tscWarn("req:0x%" PRIx64 ", sql security string check failed, QID:0x%" PRIx64 ", code:%s", (*pRequest)->self,
1,478✔
329
            (*pRequest)->requestId, tstrerror(code));
330
    destroyRequest(*pRequest);
1,478✔
331
    *pRequest = NULL;
1,478✔
332
    return code;
1,478✔
333
  }
334

335
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
1,092,057,784✔
336

337
  STscObj* pTscObj = (*pRequest)->pTscObj;
1,092,095,206✔
338
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
1,092,079,010✔
339
                             sizeof((*pRequest)->self));
340
  if (err) {
1,092,068,705✔
341
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
342
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
343
    destroyRequest(*pRequest);
×
344
    *pRequest = NULL;
×
345
    return terrno;
×
346
  }
347

348
  (*pRequest)->allocatorRefId = -1;
1,092,068,705✔
349
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
1,092,065,748✔
350
    if (TSDB_CODE_SUCCESS !=
480,734,887✔
351
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
480,732,860✔
352
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
353
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
354
      destroyRequest(*pRequest);
×
355
      *pRequest = NULL;
×
356
      return terrno;
×
357
    }
358
  }
359

360
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
1,092,153,348✔
361
  return TSDB_CODE_SUCCESS;
1,092,044,935✔
362
}
363

364
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
365
  int32_t code =
366
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
367
  if (TSDB_CODE_SUCCESS == code) {
×
368
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
369
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
370
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
371
    (*pNewRequest)->isSubReq = true;
×
372
  }
373
  return code;
×
374
}
375

376
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
7,279,284✔
377
  STscObj* pTscObj = pRequest->pTscObj;
7,279,284✔
378

379
  SParseContext cxt = {
7,281,800✔
380
      .requestId = pRequest->requestId,
7,281,091✔
381
      .requestRid = pRequest->self,
7,280,680✔
382
      .acctId = pTscObj->acctId,
7,279,274✔
383
      .db = pRequest->pDb,
7,278,973✔
384
      .topicQuery = topicQuery,
385
      .pSql = pRequest->sqlstr,
7,280,352✔
386
      .sqlLen = pRequest->sqlLen,
7,281,100✔
387
      .pMsg = pRequest->msgBuf,
7,280,711✔
388
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
389
      .pTransporter = pTscObj->pAppInfo->pTransporter,
7,276,079✔
390
      .pStmtCb = pStmtCb,
391
      .pUser = pTscObj->user,
7,279,274✔
392
      .userId = pTscObj->userId,
7,274,674✔
393
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
7,278,216✔
394
      .enableSysInfo = pTscObj->sysInfo,
7,271,714✔
395
      .minSecLevel = pTscObj->minSecLevel,
7,273,847✔
396
      .maxSecLevel = pTscObj->maxSecLevel,
7,276,381✔
397
      .macMode = pTscObj->pAppInfo->serverCfg.macActive,  // propagates cluster-level MAC state into parser/executor
7,273,177✔
398
      .sodInitial = pTscObj->pAppInfo->serverCfg.sodInitial,
7,276,412✔
399
      .svrVer = pTscObj->sVer,
7,279,266✔
400
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
7,281,073✔
401
      .stmtBindVersion = pRequest->stmtBindVersion,
7,279,992✔
402
      .setQueryFp = setQueryRequest,
403
      .timezone = pTscObj->optionInfo.timezone,
7,279,625✔
404
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
7,277,839✔
405
  };
406

407
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
7,277,866✔
408
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
7,282,547✔
409
  if (code != TSDB_CODE_SUCCESS) {
7,278,225✔
UNCOV
410
    return code;
×
411
  }
412

413
  code = qParseSql(&cxt, pQuery);
7,278,225✔
414
  if (TSDB_CODE_SUCCESS == code) {
7,275,104✔
415
    if ((*pQuery)->haveResultSet) {
7,271,240✔
UNCOV
416
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
UNCOV
417
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
×
UNCOV
418
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
419
    }
420
  }
421

422
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
7,273,650✔
423
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
7,268,459✔
424
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
7,270,930✔
425
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
7,270,097✔
426
  }
427

428
  taosArrayDestroy(cxt.pTableMetaPos);
7,269,257✔
429
  taosArrayDestroy(cxt.pTableVgroupPos);
7,262,160✔
430

431
  return code;
7,264,975✔
432
}
433

UNCOV
434
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
UNCOV
435
  SRetrieveTableRsp* pRsp = NULL;
×
UNCOV
436
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
UNCOV
437
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
438
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
439
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
440
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
441
                                 pRequest->stmtBindVersion > 0);
×
442
  }
443

444
  return code;
×
445
}
446

447
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
432,593✔
448
  // drop table if exists not_exists_table
449
  if (NULL == pQuery->pCmdMsg) {
432,593✔
UNCOV
450
    return TSDB_CODE_SUCCESS;
×
451
  }
452

453
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
432,593✔
454
  pRequest->type = pMsgInfo->msgType;
432,593✔
455
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
432,593✔
456
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
432,593✔
457

458
  STscObj*      pTscObj = pRequest->pTscObj;
432,593✔
459
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
432,593✔
460

461
  // int64_t transporterId = 0;
462
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
432,593✔
463
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
432,593✔
464
  return TSDB_CODE_SUCCESS;
432,593✔
465
}
466

467
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
1,815,359,316✔
468

469
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
6,227,063✔
470
  SRetrieveTableRsp* pRsp = NULL;
6,227,063✔
471
  if (pRequest->validateOnly) {
6,227,063✔
472
    doRequestCallback(pRequest, 0);
10,692✔
473
    return;
10,692✔
474
  }
475

476
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
12,403,361✔
477
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
12,403,388✔
478
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
6,216,371✔
479
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
3,439,390✔
480
                                 pRequest->stmtBindVersion > 0);
3,439,390✔
481
  }
482

483
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
6,216,371✔
484
  pRequest->code = code;
6,216,371✔
485

486
  if (pRequest->code != TSDB_CODE_SUCCESS) {
6,216,371✔
487
    pResultInfo->numOfRows = 0;
1,851✔
488
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
1,851✔
489
             pRequest->requestId);
490
  } else {
491
    tscDebug(
6,214,520✔
492
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
493
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
494
  }
495

496
  doRequestCallback(pRequest, code);
6,216,371✔
497
}
498

499
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
109,479,747✔
500
  if (pRequest->validateOnly) {
109,479,747✔
UNCOV
501
    doRequestCallback(pRequest, 0);
×
UNCOV
502
    return TSDB_CODE_SUCCESS;
×
503
  }
504

505
  // drop table if exists not_exists_table
506
  if (NULL == pQuery->pCmdMsg) {
109,479,747✔
507
    doRequestCallback(pRequest, 0);
7,440✔
508
    return TSDB_CODE_SUCCESS;
7,440✔
509
  }
510

511
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
109,472,264✔
512
  // Clear pQuery->pCmdMsg before the async call so that nodesDestroyNode (which may be
513
  // triggered by the async response callback on another thread) will not double-free pCmdMsg.
514
  pQuery->pCmdMsg = NULL;
109,472,003✔
515
  pRequest->type = pMsgInfo->msgType;
109,472,307✔
516
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
109,472,310✔
517
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
109,471,406✔
518

519
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
109,472,314✔
520
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
109,471,861✔
521

522
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
109,475,580✔
523
  // pMsgInfo->pMsg has been transferred to pRequest->body.requestMsg and pMsgInfo->epSet has
524
  // been consumed by asyncSendMsgToServer; the SCmdMsgInfo struct itself is no longer needed.
525
  // Use the local pMsgInfo variable (not pQuery->pCmdMsg) to avoid a use-after-free: the async
526
  // response callback may have run on another thread and destroyed pQuery by this point.
527
  taosMemoryFree(pMsgInfo);
109,478,420✔
528
  if (code) {
109,480,832✔
UNCOV
529
    doRequestCallback(pRequest, code);
×
530
  }
531
  return code;
109,481,717✔
532
}
533

534
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
173,435✔
535
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
173,435✔
536
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
173,435✔
537

538
  if (node1->load < node2->load) {
173,435✔
UNCOV
539
    return -1;
×
540
  }
541

542
  return node1->load > node2->load;
173,435✔
543
}
544

545
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
370,977✔
546
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
370,977✔
547
  if (pInfo->pQnodeList) {
370,977✔
548
    taosArrayDestroy(pInfo->pQnodeList);
365,492✔
549
    pInfo->pQnodeList = NULL;
365,492✔
550
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
365,492✔
551
  }
552

553
  if (pNodeList) {
370,977✔
554
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
370,977✔
555
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
370,977✔
556
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
370,977✔
557
             taosArrayGetSize(pInfo->pQnodeList));
558
  }
559
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
370,977✔
560

561
  return TSDB_CODE_SUCCESS;
370,977✔
562
}
563

564
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
1,085,518,221✔
565
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
1,085,518,221✔
566
    *required = false;
1,072,644,917✔
567
    return TSDB_CODE_SUCCESS;
1,072,633,097✔
568
  }
569

570
  int32_t       code = TSDB_CODE_SUCCESS;
12,873,304✔
571
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
12,873,304✔
572
  *required = false;
12,874,360✔
573

574
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
12,874,360✔
575
  *required = (NULL == pInfo->pQnodeList);
12,874,360✔
576
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
12,874,360✔
577
  return TSDB_CODE_SUCCESS;
12,874,360✔
578
}
579

UNCOV
580
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
UNCOV
581
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
UNCOV
582
  int32_t       code = 0;
×
583

584
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
585
  if (pInfo->pQnodeList) {
×
586
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
587
  }
588
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
589
  if (NULL == *pNodeList) {
×
590
    SCatalog* pCatalog = NULL;
×
UNCOV
591
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
592
    if (TSDB_CODE_SUCCESS == code) {
×
593
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
594
      if (NULL == pNodeList) {
×
595
        TSC_ERR_RET(terrno);
×
596
      }
597
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
598
                               .requestId = pRequest->requestId,
×
599
                               .requestObjRefId = pRequest->self,
×
UNCOV
600
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
601
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
602
    }
603

604
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
605
      code = updateQnodeList(pInfo, *pNodeList);
×
606
    }
607
  }
608

609
  return code;
×
610
}
611

612
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
8,529,761✔
613
  pRequest->type = pQuery->msgType;
8,529,761✔
614
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
8,529,110✔
615

616
  SPlanContext cxt = {.queryId = pRequest->requestId,
8,929,783✔
617
                      .acctId = pRequest->pTscObj->acctId,
8,528,975✔
618
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
8,526,878✔
619
                      .pAstRoot = pQuery->pRoot,
8,536,612✔
620
                      .showRewrite = pQuery->showRewrite,
8,534,464✔
621
                      .pMsg = pRequest->msgBuf,
8,532,337✔
622
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
623
                      .pUser = pRequest->pTscObj->user,
8,525,235✔
624
                      .userId = pRequest->pTscObj->userId,
8,521,368✔
625
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
8,530,814✔
626
                      .sysInfo = pRequest->pTscObj->sysInfo,
8,526,190✔
627
                      .minSecLevel = pRequest->pTscObj->minSecLevel,
8,532,394✔
628
                      .maxSecLevel = pRequest->pTscObj->maxSecLevel,
8,524,934✔
629
                      .macMode = pAppInfo->serverCfg.macActive};
8,529,160✔
630

631
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
8,527,664✔
632
}
633

634
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
288,625,245✔
635
                         const SExtSchema* pExtSchema, bool isStmt) {
636
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
288,625,245✔
637
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
9✔
UNCOV
638
    return TSDB_CODE_INVALID_PARA;
×
639
  }
640

641
  pResInfo->numOfCols = numOfCols;
288,637,843✔
642
  if (pResInfo->fields != NULL) {
288,636,777✔
643
    taosMemoryFree(pResInfo->fields);
15,501✔
644
  }
645
  if (pResInfo->userFields != NULL) {
288,636,328✔
646
    taosMemoryFree(pResInfo->userFields);
15,501✔
647
  }
648
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
288,637,122✔
649
  if (NULL == pResInfo->fields) return terrno;
288,630,172✔
650
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
288,632,239✔
651
  if (NULL == pResInfo->userFields) {
288,625,335✔
UNCOV
652
    taosMemoryFree(pResInfo->fields);
×
UNCOV
653
    return terrno;
×
654
  }
655
  if (numOfCols != pResInfo->numOfCols) {
288,626,062✔
UNCOV
656
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
UNCOV
657
    return TSDB_CODE_FAILED;
×
658
  }
659

660
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
1,468,461,241✔
661
    pResInfo->fields[i].type = pSchema[i].type;
1,179,820,610✔
662

663
    pResInfo->userFields[i].type = pSchema[i].type;
1,179,821,108✔
664
    // userFields must convert to type bytes, no matter isStmt or not
665
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
1,179,823,885✔
666
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
1,179,826,272✔
667
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
1,179,831,514✔
668
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,594,552✔
669
    }
670

671
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
1,179,835,414✔
672
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
1,179,838,157✔
673
  }
674
  return TSDB_CODE_SUCCESS;
288,643,236✔
675
}
676

677
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
206,559,369✔
678
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
206,559,369✔
679
      precision != TSDB_TIME_PRECISION_NANO) {
UNCOV
680
    return;
×
681
  }
682

683
  pResInfo->precision = precision;
206,559,369✔
684
}
685

686
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
211,025,716✔
687
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
211,025,716✔
688
  if (NULL == nodeList) {
211,033,301✔
UNCOV
689
    return terrno;
×
690
  }
691
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
211,035,602✔
692

693
  int32_t dbNum = taosArrayGetSize(pDbVgList);
211,035,602✔
694
  for (int32_t i = 0; i < dbNum; ++i) {
419,126,185✔
695
    SArray* pVg = taosArrayGetP(pDbVgList, i);
208,065,775✔
696
    if (NULL == pVg) {
208,071,370✔
UNCOV
697
      continue;
×
698
    }
699
    int32_t vgNum = taosArrayGetSize(pVg);
208,071,370✔
700
    if (vgNum <= 0) {
208,069,745✔
701
      continue;
613,041✔
702
    }
703

704
    for (int32_t j = 0; j < vgNum; ++j) {
729,230,653✔
705
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
521,762,589✔
706
      if (NULL == pInfo) {
521,786,281✔
UNCOV
707
        taosArrayDestroy(nodeList);
×
UNCOV
708
        return TSDB_CODE_OUT_OF_RANGE;
×
709
      }
710
      SQueryNodeLoad load = {0};
521,786,281✔
711
      load.addr.nodeId = pInfo->vgId;
521,784,570✔
712
      load.addr.epSet = pInfo->epSet;
521,799,943✔
713

714
      if (NULL == taosArrayPush(nodeList, &load)) {
521,719,460✔
715
        taosArrayDestroy(nodeList);
×
UNCOV
716
        return terrno;
×
717
      }
718
    }
719
  }
720

721
  int32_t vnodeNum = taosArrayGetSize(nodeList);
211,060,410✔
722
  if (vnodeNum > 0) {
211,058,167✔
723
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
207,140,527✔
724
    goto _return;
207,141,849✔
725
  }
726

727
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
3,917,640✔
728
  if (mnodeNum <= 0) {
3,912,271✔
UNCOV
729
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
UNCOV
730
    goto _return;
×
731
  }
732

733
  void* pData = taosArrayGet(pMnodeList, 0);
3,912,271✔
734
  if (NULL == pData) {
3,912,271✔
UNCOV
735
    taosArrayDestroy(nodeList);
×
736
    return TSDB_CODE_OUT_OF_RANGE;
×
737
  }
738
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
3,912,271✔
UNCOV
739
    taosArrayDestroy(nodeList);
×
UNCOV
740
    return terrno;
×
741
  }
742

743
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
3,912,271✔
744

745
_return:
115,808✔
746

747
  *pNodeList = nodeList;
211,047,780✔
748

749
  return TSDB_CODE_SUCCESS;
211,043,526✔
750
}
751

752
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
1,624,178✔
753
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,624,178✔
754
  if (NULL == nodeList) {
1,624,178✔
UNCOV
755
    return terrno;
×
756
  }
757

758
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
1,624,178✔
759
  if (qNodeNum > 0) {
1,624,178✔
760
    void* pData = taosArrayGet(pQnodeList, 0);
1,603,008✔
761
    if (NULL == pData) {
1,603,008✔
762
      taosArrayDestroy(nodeList);
×
UNCOV
763
      return TSDB_CODE_OUT_OF_RANGE;
×
764
    }
765
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
1,603,008✔
UNCOV
766
      taosArrayDestroy(nodeList);
×
UNCOV
767
      return terrno;
×
768
    }
769
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
1,603,008✔
770
    goto _return;
1,603,008✔
771
  }
772

773
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
21,170✔
774
  if (mnodeNum <= 0) {
21,170✔
UNCOV
775
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
×
UNCOV
776
    goto _return;
×
777
  }
778

779
  void* pData = taosArrayGet(pMnodeList, 0);
21,170✔
780
  if (NULL == pData) {
21,170✔
UNCOV
781
    taosArrayDestroy(nodeList);
×
782
    return TSDB_CODE_OUT_OF_RANGE;
×
783
  }
784
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
21,170✔
UNCOV
785
    taosArrayDestroy(nodeList);
×
UNCOV
786
    return terrno;
×
787
  }
788

789
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
21,170✔
790

UNCOV
791
_return:
×
792

793
  *pNodeList = nodeList;
1,624,178✔
794

795
  return TSDB_CODE_SUCCESS;
1,624,178✔
796
}
797

798
void freeVgList(void* list) {
8,414,190✔
799
  SArray* pList = *(SArray**)list;
8,414,190✔
800
  taosArrayDestroy(pList);
8,415,967✔
801
}
8,403,984✔
802

803
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
204,123,140✔
804
  SArray* pDbVgList = NULL;
204,123,140✔
805
  SArray* pQnodeList = NULL;
204,123,140✔
806
  FDelete fp = NULL;
204,123,140✔
807
  int32_t code = 0;
204,123,140✔
808

809
  switch (tsQueryPolicy) {
204,123,140✔
810
    case QUERY_POLICY_VNODE:
202,508,095✔
811
    case QUERY_POLICY_CLIENT: {
812
      if (pResultMeta) {
202,508,095✔
813
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
202,513,957✔
814
        if (NULL == pDbVgList) {
202,512,121✔
UNCOV
815
          code = terrno;
×
UNCOV
816
          goto _return;
×
817
        }
818
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
202,512,121✔
819
        for (int32_t i = 0; i < dbNum; ++i) {
402,157,741✔
820
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
199,644,277✔
821
          if (pRes->code || NULL == pRes->pRes) {
199,646,350✔
822
            continue;
583✔
823
          }
824

825
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
399,296,827✔
UNCOV
826
            code = terrno;
×
UNCOV
827
            goto _return;
×
828
          }
829
        }
830
      } else {
831
        fp = freeVgList;
1,764✔
832

833
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
1,764✔
834
        if (dbNum > 0) {
1,764✔
835
          SCatalog*     pCtg = NULL;
1,764✔
836
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,764✔
837
          code = catalogGetHandle(pInst->clusterId, &pCtg);
1,764✔
838
          if (code != TSDB_CODE_SUCCESS) {
1,764✔
UNCOV
839
            goto _return;
×
840
          }
841

842
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
1,764✔
843
          if (NULL == pDbVgList) {
1,764✔
UNCOV
844
            code = terrno;
×
UNCOV
845
            goto _return;
×
846
          }
847
          SArray* pVgList = NULL;
1,764✔
848
          for (int32_t i = 0; i < dbNum; ++i) {
3,528✔
849
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
1,764✔
850
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
1,764✔
851
                                     .requestId = pRequest->requestId,
1,764✔
852
                                     .requestObjRefId = pRequest->self,
1,764✔
853
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
1,764✔
854

855
            // catalogGetDBVgList will handle dbFName == null.
856
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
1,764✔
857
            if (code) {
1,764✔
UNCOV
858
              goto _return;
×
859
            }
860

861
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
1,764✔
UNCOV
862
              code = terrno;
×
UNCOV
863
              goto _return;
×
864
            }
865
          }
866
        }
867
      }
868

869
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
202,515,228✔
870
      break;
202,509,456✔
871
    }
872
    case QUERY_POLICY_HYBRID:
1,624,178✔
873
    case QUERY_POLICY_QNODE: {
874
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
1,652,212✔
875
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
28,034✔
876
        if (pRes->code) {
28,034✔
UNCOV
877
          pQnodeList = NULL;
×
878
        } else {
879
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
28,034✔
880
          if (NULL == pQnodeList) {
28,034✔
UNCOV
881
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
882
            goto _return;
×
883
          }
884
        }
885
      } else {
886
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,596,144✔
887
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
1,596,144✔
888
        if (pInst->pQnodeList) {
1,596,144✔
889
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
1,596,144✔
890
          if (NULL == pQnodeList) {
1,596,144✔
UNCOV
891
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
892
            goto _return;
×
893
          }
894
        }
895
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
1,596,144✔
896
      }
897

898
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
1,624,178✔
899
      break;
1,624,178✔
900
    }
901
    default:
412✔
902
      tscError("unknown query policy: %d", tsQueryPolicy);
412✔
UNCOV
903
      return TSDB_CODE_APP_ERROR;
×
904
  }
905

906
_return:
204,133,634✔
907
  taosArrayDestroyEx(pDbVgList, fp);
204,133,634✔
908
  taosArrayDestroy(pQnodeList);
204,137,643✔
909

910
  return code;
204,141,665✔
911
}
912

913
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
8,521,815✔
914
  SArray* pDbVgList = NULL;
8,521,815✔
915
  SArray* pQnodeList = NULL;
8,521,815✔
916
  int32_t code = 0;
8,523,658✔
917

918
  switch (tsQueryPolicy) {
8,523,658✔
919
    case QUERY_POLICY_VNODE:
8,518,620✔
920
    case QUERY_POLICY_CLIENT: {
921
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
8,518,620✔
922
      if (dbNum > 0) {
8,529,684✔
923
        SCatalog*     pCtg = NULL;
8,413,738✔
924
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
8,411,926✔
925
        code = catalogGetHandle(pInst->clusterId, &pCtg);
8,410,182✔
926
        if (code != TSDB_CODE_SUCCESS) {
8,408,600✔
UNCOV
927
          goto _return;
×
928
        }
929

930
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
8,408,600✔
931
        if (NULL == pDbVgList) {
8,413,312✔
932
          code = terrno;
78✔
UNCOV
933
          goto _return;
×
934
        }
935
        SArray* pVgList = NULL;
8,413,234✔
936
        for (int32_t i = 0; i < dbNum; ++i) {
16,825,385✔
937
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
8,412,143✔
938
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
8,413,155✔
939
                                   .requestId = pRequest->requestId,
8,418,578✔
940
                                   .requestObjRefId = pRequest->self,
8,409,096✔
941
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
8,410,043✔
942

943
          // catalogGetDBVgList will handle dbFName == null.
944
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
8,419,130✔
945
          if (code) {
8,410,720✔
UNCOV
946
            goto _return;
×
947
          }
948

949
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
8,421,001✔
UNCOV
950
            code = terrno;
×
UNCOV
951
            goto _return;
×
952
          }
953
        }
954
      }
955

956
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
8,535,382✔
957
      break;
8,530,112✔
958
    }
UNCOV
959
    case QUERY_POLICY_HYBRID:
×
960
    case QUERY_POLICY_QNODE: {
UNCOV
961
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
962

UNCOV
963
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
UNCOV
964
      break;
×
965
    }
966
    default:
5,051✔
967
      tscError("unknown query policy: %d", tsQueryPolicy);
5,051✔
968
      return TSDB_CODE_APP_ERROR;
×
969
  }
970

971
_return:
8,529,072✔
972

973
  taosArrayDestroyEx(pDbVgList, freeVgList);
8,528,398✔
974
  taosArrayDestroy(pQnodeList);
8,522,424✔
975

976
  return code;
8,526,339✔
977
}
978

979
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
8,524,532✔
980
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
8,524,532✔
981

982
  SExecResult      res = {0};
8,530,872✔
983
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
8,527,942✔
984
                           .requestId = pRequest->requestId,
8,530,177✔
985
                           .requestObjRefId = pRequest->self};
8,525,987✔
986
  SSchedulerReq    req = {
8,919,065✔
987
         .syncReq = true,
988
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
8,522,749✔
989
         .pConn = &conn,
990
         .pNodeList = pNodeList,
991
         .pDag = pDag,
992
         .sql = pRequest->sqlstr,
8,522,749✔
993
         .startTs = pRequest->metric.start,
8,526,611✔
994
         .execFp = NULL,
995
         .cbParam = NULL,
996
         .chkKillFp = chkRequestKilled,
997
         .chkKillParam = (void*)pRequest->self,
8,522,770✔
998
         .pExecRes = &res,
999
         .source = pRequest->source,
8,525,835✔
1000
         .secureDelete = pRequest->secureDelete,
8,522,544✔
1001
         .pWorkerCb = getTaskPoolWorkerCb(),
8,528,855✔
1002
  };
1003

1004
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
8,522,430✔
1005

1006
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
8,536,166✔
1007
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
8,537,269✔
1008

1009
  if (code != TSDB_CODE_SUCCESS) {
8,535,459✔
UNCOV
1010
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
1011

UNCOV
1012
    pRequest->code = code;
×
UNCOV
1013
    terrno = code;
×
UNCOV
1014
    return pRequest->code;
×
1015
  }
1016

1017
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
8,535,459✔
1018
      TDMT_VND_CREATE_TABLE == pRequest->type) {
88,241✔
1019
    pRequest->body.resInfo.numOfRows = res.numOfRows;
8,504,304✔
1020
    if (TDMT_VND_SUBMIT == pRequest->type) {
8,505,363✔
1021
      STscObj*            pTscObj = pRequest->pTscObj;
8,447,222✔
1022
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
8,447,623✔
1023
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
8,449,008✔
1024
    }
1025

1026
    schedulerFreeJob(&pRequest->body.queryJob, 0);
8,505,268✔
1027
  }
1028

1029
  pRequest->code = res.code;
8,535,792✔
1030
  terrno = res.code;
8,537,321✔
1031
  return pRequest->code;
8,533,395✔
1032
}
1033

1034
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
611,151,689✔
1035
  SArray*      pArray = NULL;
611,151,689✔
1036
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
611,151,689✔
1037
  if (NULL == pRsp->aCreateTbRsp) {
611,151,689✔
1038
    return TSDB_CODE_SUCCESS;
598,007,174✔
1039
  }
1040

1041
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
13,169,013✔
1042
  for (int32_t i = 0; i < tbNum; ++i) {
29,468,401✔
1043
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
16,301,024✔
1044
    if (pTbRsp->pMeta) {
16,301,383✔
1045
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
16,008,793✔
1046
    }
1047
  }
1048

1049
  return TSDB_CODE_SUCCESS;
13,167,377✔
1050
}
1051

1052
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
161,854,619✔
1053
  int32_t code = 0;
161,854,619✔
1054
  SArray* pArray = NULL;
161,854,619✔
1055
  SArray* pTbArray = (SArray*)res;
161,854,619✔
1056
  int32_t tbNum = taosArrayGetSize(pTbArray);
161,854,619✔
1057
  if (tbNum <= 0) {
161,855,615✔
UNCOV
1058
    return TSDB_CODE_SUCCESS;
×
1059
  }
1060

1061
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
161,855,615✔
1062
  if (NULL == pArray) {
161,856,104✔
UNCOV
1063
    return terrno;
×
1064
  }
1065

1066
  for (int32_t i = 0; i < tbNum; ++i) {
533,929,417✔
1067
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
372,071,935✔
1068
    if (NULL == tbInfo) {
372,072,494✔
UNCOV
1069
      code = terrno;
×
1070
      goto _return;
×
1071
    }
1072
    STbSVersion tbSver = {
372,072,494✔
1073
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
372,071,969✔
1074
    if (NULL == taosArrayPush(pArray, &tbSver)) {
372,073,771✔
UNCOV
1075
      code = terrno;
×
1076
      goto _return;
×
1077
    }
1078
  }
1079

1080
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
161,857,482✔
1081
                           .requestId = pRequest->requestId,
161,857,580✔
1082
                           .requestObjRefId = pRequest->self,
161,857,357✔
1083
                           .mgmtEps = *epset};
1084

1085
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
161,857,539✔
1086

1087
_return:
161,855,953✔
1088

1089
  taosArrayDestroy(pArray);
161,855,953✔
1090
  return code;
161,856,609✔
1091
}
1092

1093
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
11,745,397✔
1094
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
11,745,397✔
1095
}
1096

1097
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
70,855,246✔
1098
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
70,855,246✔
1099
}
1100

1101
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
880,781,611✔
1102
  if (NULL == pRequest->body.resInfo.execRes.res) {
880,781,611✔
1103
    return pRequest->code;
55,161,147✔
1104
  }
1105

1106
  SCatalog*     pCatalog = NULL;
825,592,838✔
1107
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
825,604,255✔
1108

1109
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
825,643,568✔
1110
  if (code) {
825,623,356✔
UNCOV
1111
    return code;
×
1112
  }
1113

1114
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
825,623,356✔
1115
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
825,653,103✔
1116

1117
  switch (pRes->msgType) {
825,655,483✔
1118
    case TDMT_VND_ALTER_TABLE:
6,884,048✔
1119
    case TDMT_MND_ALTER_STB: {
1120
      code = handleAlterTbExecRes(pRes->res, pCatalog);
6,884,048✔
1121
      break;
6,884,048✔
1122
    }
1123
    case TDMT_VND_CREATE_TABLE: {
45,319,890✔
1124
      SArray* pList = (SArray*)pRes->res;
45,319,890✔
1125
      int32_t num = taosArrayGetSize(pList);
45,323,667✔
1126
      for (int32_t i = 0; i < num; ++i) {
98,083,318✔
1127
        void* res = taosArrayGetP(pList, i);
52,755,726✔
1128
        // handleCreateTbExecRes will handle res == null
1129
        code = handleCreateTbExecRes(res, pCatalog);
52,756,052✔
1130
      }
1131
      break;
45,327,592✔
1132
    }
1133
    case TDMT_MND_CREATE_STB: {
408,451✔
1134
      code = handleCreateTbExecRes(pRes->res, pCatalog);
408,451✔
1135
      break;
408,451✔
1136
    }
1137
    case TDMT_VND_SUBMIT: {
611,153,235✔
1138
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
611,153,235✔
1139

1140
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
611,176,143✔
1141
      break;
611,172,603✔
1142
    }
1143
    case TDMT_SCH_QUERY:
161,855,895✔
1144
    case TDMT_SCH_MERGE_QUERY: {
1145
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
161,855,895✔
1146
      break;
161,850,487✔
1147
    }
1148
    default:
10✔
1149
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
10✔
1150
               pRequest->type, pRequest->requestId);
UNCOV
1151
      code = TSDB_CODE_APP_ERROR;
×
1152
  }
1153

1154
  return code;
825,643,181✔
1155
}
1156

1157
static bool incompletaFileParsing(SNode* pStmt) {
858,482,525✔
1158
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
858,482,525✔
1159
}
1160

UNCOV
1161
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
UNCOV
1162
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1163

UNCOV
1164
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
UNCOV
1165
  if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
1166
    int64_t analyseStart = taosGetTimestampUs();
×
UNCOV
1167
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1168
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1169
  }
1170

1171
  if (TSDB_CODE_SUCCESS == code) {
×
1172
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1173
  }
1174

1175
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
UNCOV
1176
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
UNCOV
1177
}
×
1178

1179
void returnToUser(SRequestObj* pRequest) {
78,885,070✔
1180
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
78,885,070✔
1181
    // return to client
1182
    doRequestCallback(pRequest, pRequest->code);
78,885,070✔
1183
    return;
78,882,948✔
1184
  }
1185

UNCOV
1186
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
UNCOV
1187
  if (pUserReq) {
×
UNCOV
1188
    pUserReq->code = pRequest->code;
×
1189
    // return to client
UNCOV
1190
    doRequestCallback(pUserReq, pUserReq->code);
×
UNCOV
1191
    (void)releaseRequest(pRequest->relation.userRefId);
×
UNCOV
1192
    return;
×
1193
  } else {
1194
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1195
             pRequest->relation.userRefId, pRequest->requestId);
1196
  }
1197
}
1198

1199
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
UNCOV
1200
  int64_t     lastTs = 0;
×
1201
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
UNCOV
1202
  int32_t     numOfFields = taos_num_fields(pRes);
×
1203

UNCOV
1204
  int32_t code = createDataBlock(pBlock);
×
UNCOV
1205
  if (code) {
×
1206
    return code;
×
1207
  }
1208

1209
  for (int32_t i = 0; i < numOfFields; ++i) {
×
UNCOV
1210
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1211
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1212
    if (TSDB_CODE_SUCCESS != code) {
×
1213
      blockDataDestroy(*pBlock);
×
UNCOV
1214
      return code;
×
1215
    }
1216
  }
1217

1218
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1219
  if (TSDB_CODE_SUCCESS != code) {
×
1220
    blockDataDestroy(*pBlock);
×
1221
    return code;
×
1222
  }
1223

UNCOV
1224
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1225
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1226
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1227
      tscError("invalid data from vnode");
×
1228
      blockDataDestroy(*pBlock);
×
UNCOV
1229
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1230
    }
1231
    int64_t ts = *(int64_t*)pRow[0];
×
1232
    if (lastTs < ts) {
×
1233
      lastTs = ts;
×
1234
    }
1235

1236
    for (int32_t j = 0; j < numOfFields; ++j) {
×
UNCOV
1237
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1238
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1239
      if (TSDB_CODE_SUCCESS != code) {
×
1240
        blockDataDestroy(*pBlock);
×
UNCOV
1241
        return code;
×
1242
      }
1243
    }
1244

1245
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1246
            *(int64_t*)pRow[2]);
1247
  }
1248

UNCOV
1249
  (*pBlock)->info.window.ekey = lastTs;
×
UNCOV
1250
  (*pBlock)->info.rows = numOfRows;
×
1251

1252
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
UNCOV
1253
  return TSDB_CODE_SUCCESS;
×
1254
}
1255

1256
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1257
  SRequestObj* pRequest = (SRequestObj*)res;
×
UNCOV
1258
  if (pRequest->code) {
×
1259
    returnToUser(pRequest);
×
1260
    return;
×
1261
  }
1262

1263
  SSDataBlock* pBlock = NULL;
×
1264
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1265
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1266
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1267
             tstrerror(pRequest->code));
UNCOV
1268
    returnToUser(pRequest);
×
UNCOV
1269
    return;
×
1270
  }
1271

1272
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1273
  if (pNextReq) {
×
UNCOV
1274
    continuePostSubQuery(pNextReq, pBlock);
×
1275
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1276
  } else {
UNCOV
1277
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1278
             pRequest->relation.nextRefId, pRequest->requestId);
1279
  }
1280

1281
  blockDataDestroy(pBlock);
×
1282
}
1283

1284
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
UNCOV
1285
  SRequestObj* pRequest = pWrapper->pRequest;
×
UNCOV
1286
  if (TD_RES_QUERY(pRequest)) {
×
UNCOV
1287
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1288
    return;
×
1289
  }
1290

1291
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1292
  if (pNextReq) {
×
1293
    continuePostSubQuery(pNextReq, NULL);
×
1294
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1295
  } else {
UNCOV
1296
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1297
             pRequest->relation.nextRefId, pRequest->requestId);
1298
  }
1299
}
1300

1301
// todo refacto the error code  mgmt
1302
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
871,923,743✔
1303
  SSqlCallbackWrapper* pWrapper = param;
871,923,743✔
1304
  SRequestObj*         pRequest = pWrapper->pRequest;
871,923,743✔
1305
  STscObj*             pTscObj = pRequest->pTscObj;
871,942,305✔
1306

1307
  // Note: This is EXECUTE completion callback, not FETCH callback.
1308
  // Scheduler job phase is authoritative. Client phase is only fallback.
1309
  // Let heartbeat read scheduler job phase via schedulerGetJobPhase().
1310

1311
  pRequest->code = code;
871,942,564✔
1312
  if (pResult) {
871,946,387✔
1313
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
871,859,678✔
1314
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
871,867,545✔
1315
  }
1316

1317
  int32_t type = pRequest->type;
871,914,916✔
1318
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
871,884,717✔
1319
    if (pResult) {
652,087,222✔
1320
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
652,056,417✔
1321

1322
      // record the insert rows
1323
      if (TDMT_VND_SUBMIT == type) {
652,086,259✔
1324
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
603,319,258✔
1325
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
603,325,530✔
1326
      }
1327
    }
1328
    schedulerFreeJob(&pRequest->body.queryJob, 0);
652,114,177✔
1329
  }
1330

1331
  taosMemoryFree(pResult);
871,940,639✔
1332
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
871,910,140✔
1333
           pRequest->requestId);
1334

1335
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
871,907,288✔
1336
      pRequest->stmtBindVersion == 0) {
92,159✔
1337
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
91,767✔
1338
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1339
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
91,767✔
UNCOV
1340
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1341
    }
1342
    restartAsyncQuery(pRequest, code);
91,767✔
1343
    return;
91,767✔
1344
  }
1345

1346
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
871,815,521✔
1347
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
871,815,521✔
1348
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
3,783,163✔
UNCOV
1349
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1350
    }
1351
  }
1352

1353
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
871,824,668✔
1354

1355
  int32_t code1 = handleQueryExecRsp(pRequest);
871,836,923✔
1356
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
871,835,566✔
UNCOV
1357
    pRequest->code = code1;
×
1358
  }
1359

1360
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,730,345,828✔
1361
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
858,479,106✔
1362
    continueInsertFromCsv(pWrapper, pRequest);
12,787✔
1363
    return;
12,787✔
1364
  }
1365

1366
  if (pRequest->relation.nextRefId) {
871,856,729✔
UNCOV
1367
    handlePostSubQuery(pWrapper);
×
1368
  } else {
1369
    destorySqlCallbackWrapper(pWrapper);
871,846,481✔
1370
    pRequest->pWrapper = NULL;
871,808,285✔
1371

1372
    // return to client
1373
    doRequestCallback(pRequest, code);
871,824,818✔
1374
  }
1375
}
1376

1377
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
8,957,717✔
1378
  int32_t code = 0;
8,957,717✔
1379
  int32_t subplanNum = 0;
8,957,717✔
1380

1381
  if (pQuery->pRoot) {
8,957,717✔
1382
    pRequest->stmtType = pQuery->pRoot->type;
8,531,950✔
1383
    if (nodeType(pQuery->pRoot) == QUERY_NODE_DELETE_STMT) {
8,530,741✔
UNCOV
1384
      pRequest->secureDelete = ((SDeleteStmt*)pQuery->pRoot)->secureDelete;
×
1385
    }
1386
  }
1387

1388
  if (pQuery->pRoot && !pRequest->inRetry) {
8,960,831✔
1389
    STscObj*            pTscObj = pRequest->pTscObj;
8,528,286✔
1390
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
8,534,036✔
1391
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
8,535,817✔
1392
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
8,516,963✔
1393
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
15,700✔
1394
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
15,708✔
1395
    }
1396
  }
1397

1398
  pRequest->body.execMode = pQuery->execMode;
8,970,513✔
1399
  switch (pQuery->execMode) {
8,966,288✔
UNCOV
1400
    case QUERY_EXEC_MODE_LOCAL:
×
UNCOV
1401
      if (!pRequest->validateOnly) {
×
UNCOV
1402
        if (NULL == pQuery->pRoot) {
×
UNCOV
1403
          terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
1404
          code = terrno;
×
1405
        } else {
UNCOV
1406
          code = execLocalCmd(pRequest, pQuery);
×
1407
        }
1408
      }
1409
      break;
×
1410
    case QUERY_EXEC_MODE_RPC:
432,593✔
1411
      if (!pRequest->validateOnly) {
432,593✔
1412
        code = execDdlQuery(pRequest, pQuery);
432,593✔
1413
      }
1414
      break;
432,593✔
1415
    case QUERY_EXEC_MODE_SCHEDULE: {
8,522,712✔
1416
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
8,522,712✔
1417
      if (NULL == pMnodeList) {
8,525,301✔
UNCOV
1418
        code = terrno;
×
UNCOV
1419
        break;
×
1420
      }
1421
      SQueryPlan* pDag = NULL;
8,525,301✔
1422
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
8,526,351✔
1423
      if (TSDB_CODE_SUCCESS == code) {
8,528,745✔
1424
        pRequest->body.subplanNum = pDag->numOfSubplans;
8,530,595✔
1425
        if (!pRequest->validateOnly) {
8,526,683✔
1426
          SArray* pNodeList = NULL;
8,521,091✔
1427
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
8,522,903✔
1428

1429
          if (TSDB_CODE_SUCCESS == code) {
8,529,504✔
1430
            code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
8,531,383✔
1431
                                        taosArrayGetSize(pNodeList));
8,530,640✔
1432
          }
1433

1434
          if (TSDB_CODE_SUCCESS == code) {
8,526,047✔
1435
            code = scheduleQuery(pRequest, pDag, pNodeList);
8,526,047✔
1436
          }
1437
          taosArrayDestroy(pNodeList);
8,532,513✔
1438
        }
1439
      }
1440
      taosArrayDestroy(pMnodeList);
8,527,697✔
1441
      break;
8,533,911✔
1442
    }
UNCOV
1443
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
UNCOV
1444
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
UNCOV
1445
      break;
×
UNCOV
1446
    default:
×
UNCOV
1447
      break;
×
1448
  }
1449

1450
  if (!keepQuery) {
8,966,342✔
1451
    qDestroyQuery(pQuery);
×
1452
  }
1453

1454
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
8,966,342✔
1455
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
25,765✔
1456
    if (TSDB_CODE_SUCCESS != ret) {
25,765✔
UNCOV
1457
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1458
               pRequest->requestId);
1459
    }
1460
  }
1461

1462
  if (TSDB_CODE_SUCCESS == code) {
8,967,614✔
1463
    code = handleQueryExecRsp(pRequest);
8,965,151✔
1464
  }
1465

1466
  if (TSDB_CODE_SUCCESS != code) {
8,967,617✔
1467
    pRequest->code = code;
6,627✔
1468
  }
1469

1470
  if (res) {
8,967,617✔
UNCOV
1471
    *res = pRequest->body.resInfo.execRes.res;
×
UNCOV
1472
    pRequest->body.resInfo.execRes.res = NULL;
×
1473
  }
1474
}
8,967,617✔
1475

1476
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
872,288,122✔
1477
                                 SSqlCallbackWrapper* pWrapper) {
1478
  int32_t code = TSDB_CODE_SUCCESS;
872,288,122✔
1479
  pRequest->type = pQuery->msgType;
872,288,122✔
1480
  SArray*     pMnodeList = NULL;
872,325,033✔
1481
  SArray*     pNodeList = NULL;
872,325,033✔
1482
  SQueryPlan* pDag = NULL;
872,284,420✔
1483
  int64_t     st = taosGetTimestampUs();
872,319,859✔
1484

1485
  if (!pRequest->parseOnly) {
872,319,859✔
1486
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_PLAN);
1,744,694,518✔
1487

1488
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
872,454,451✔
1489
    if (NULL == pMnodeList) {
872,272,084✔
UNCOV
1490
      code = terrno;
×
1491
    }
1492
    SPlanContext cxt = {.queryId = pRequest->requestId,
964,155,518✔
1493
                        .acctId = pRequest->pTscObj->acctId,
872,355,987✔
1494
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
872,386,025✔
1495
                        .pAstRoot = pQuery->pRoot,
872,407,692✔
1496
                        .showRewrite = pQuery->showRewrite,
872,408,626✔
1497
                        .isView = pWrapper->pParseCtx->isView,
872,380,113✔
1498
                        .isAudit = pWrapper->pParseCtx->isAudit,
872,392,911✔
1499
                        .privInfo = pWrapper->pParseCtx->privInfo,
872,337,387✔
1500
                        .pMsg = pRequest->msgBuf,
872,359,350✔
1501
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1502
                        .pUser = pRequest->pTscObj->user,
872,308,606✔
1503
                        .userId = pRequest->pTscObj->userId,
872,343,621✔
1504
                        .sysInfo = pRequest->pTscObj->sysInfo,
872,313,387✔
1505
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
872,377,642✔
1506
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
872,342,471✔
1507
    if (TSDB_CODE_SUCCESS == code) {
872,308,868✔
1508
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
872,319,424✔
1509
    }
1510
    if (code) {
872,266,949✔
1511
      tscError("req:0x%" PRIx64 " requestId:0x%" PRIx64 ", failed to create query plan, code:%s msg:%s", pRequest->self,
261,136✔
1512
               pRequest->requestId, tstrerror(code), cxt.pMsg);
1513
    } else {
1514
      pRequest->body.subplanNum = pDag->numOfSubplans;
872,005,813✔
1515
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
872,075,403✔
1516
    }
1517
  }
1518

1519
  pRequest->metric.execStart = taosGetTimestampUs();
872,308,791✔
1520
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
872,252,300✔
1521

1522
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
872,291,584✔
1523
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
871,769,957✔
1524
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_SCHEDULE);
408,285,762✔
1525
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
204,148,795✔
1526
    }
1527

1528
    if (code == TSDB_CODE_SUCCESS) {
871,884,877✔
1529
      code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
871,841,668✔
1530
                                  taosArrayGetSize(pNodeList));
871,821,689✔
1531
    }
1532

1533
    if (code == TSDB_CODE_SUCCESS) {
871,885,921✔
1534
      SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
871,882,801✔
1535
                               .requestId = pRequest->requestId,
871,899,085✔
1536
                               .requestObjRefId = pRequest->self};
871,826,280✔
1537
      SSchedulerReq    req = {
917,668,211✔
1538
             .syncReq = false,
1539
             .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
871,816,851✔
1540
             .pConn = &conn,
1541
             .pNodeList = pNodeList,
1542
             .pDag = pDag,
1543
             .allocatorRefId = pRequest->allocatorRefId,
871,816,851✔
1544
             .sql = pRequest->sqlstr,
871,800,406✔
1545
             .startTs = pRequest->metric.start,
871,835,584✔
1546
             .execFp = schedulerExecCb,
1547
             .cbParam = pWrapper,
1548
             .chkKillFp = chkRequestKilled,
1549
             .chkKillParam = (void*)pRequest->self,
871,865,769✔
1550
             .pExecRes = NULL,
1551
             .source = pRequest->source,
871,816,838✔
1552
             .secureDelete = pRequest->secureDelete,
871,815,649✔
1553
             .pWorkerCb = getTaskPoolWorkerCb(),
871,830,160✔
1554
      };
1555

1556
      if (TSDB_CODE_SUCCESS == code) {
871,773,700✔
1557
        CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_EXECUTE);
1,743,797,333✔
1558
        code = schedulerExecJob(&req, &pRequest->body.queryJob);
871,892,762✔
1559
      }
1560
      taosArrayDestroy(pNodeList);
871,801,032✔
1561
      taosArrayDestroy(pMnodeList);
871,925,412✔
1562
      return code;
871,912,580✔
1563
    }
1564
  }
1565

1566
  qDestroyQueryPlan(pDag);
545,381✔
1567
  tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
458,550✔
1568
           pRequest->requestId);
1569
  destorySqlCallbackWrapper(pWrapper);
458,550✔
1570
  pRequest->pWrapper = NULL;
458,550✔
1571
  if (TSDB_CODE_SUCCESS != code) {
458,550✔
1572
    pRequest->code = code;
264,256✔
1573
  }
1574

1575
  doRequestCallback(pRequest, code);
458,550✔
1576

1577
  // todo not to be released here
1578
  taosArrayDestroy(pMnodeList);
458,550✔
1579
  taosArrayDestroy(pNodeList);
458,550✔
1580

1581
  return code;
455,519✔
1582
}
1583

1584
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
988,906,071✔
1585
  int32_t code = 0;
988,906,071✔
1586

1587
  if (pRequest->parseOnly) {
988,906,071✔
1588
    doRequestCallback(pRequest, 0);
302,830✔
1589
    return;
302,830✔
1590
  }
1591

1592
  pRequest->body.execMode = pQuery->execMode;
988,657,355✔
1593
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
988,666,293✔
1594
    destorySqlCallbackWrapper(pWrapper);
116,333,490✔
1595
    pRequest->pWrapper = NULL;
116,337,661✔
1596
  }
1597

1598
  if (pQuery->pRoot && !pRequest->inRetry) {
988,633,193✔
1599
    STscObj*            pTscObj = pRequest->pTscObj;
988,629,354✔
1600
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
988,699,530✔
1601
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
988,699,028✔
1602
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
667,773,295✔
1603
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
602,839,695✔
1604
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
385,843,582✔
1605
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
151,998,841✔
1606
    }
1607
  }
1608

1609
  switch (pQuery->execMode) {
988,721,205✔
1610
    case QUERY_EXEC_MODE_LOCAL:
6,227,063✔
1611
      asyncExecLocalCmd(pRequest, pQuery);
6,227,063✔
1612
      break;
6,227,063✔
1613
    case QUERY_EXEC_MODE_RPC:
109,486,911✔
1614
      code = asyncExecDdlQuery(pRequest, pQuery);
109,486,911✔
1615
      break;
109,483,816✔
1616
    case QUERY_EXEC_MODE_SCHEDULE: {
872,344,146✔
1617
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
872,344,146✔
1618
      break;
872,366,524✔
1619
    }
1620
    case QUERY_EXEC_MODE_EMPTY_RESULT:
618,762✔
1621
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
618,762✔
1622
      doRequestCallback(pRequest, 0);
618,762✔
1623
      break;
618,762✔
UNCOV
1624
    default:
×
UNCOV
1625
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
UNCOV
1626
      doRequestCallback(pRequest, -1);
×
UNCOV
1627
      break;
×
1628
  }
1629
}
1630

1631
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
14,623✔
1632
  SCatalog* pCatalog = NULL;
14,623✔
1633
  int32_t   code = 0;
14,623✔
1634
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
14,623✔
1635
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
14,623✔
1636

1637
  if (dbNum <= 0 && tblNum <= 0) {
14,623✔
1638
    return TSDB_CODE_APP_ERROR;
13,839✔
1639
  }
1640

1641
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
784✔
1642
  if (code != TSDB_CODE_SUCCESS) {
784✔
UNCOV
1643
    return code;
×
1644
  }
1645

1646
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
784✔
1647
                           .requestId = pRequest->requestId,
784✔
1648
                           .requestObjRefId = pRequest->self,
784✔
1649
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
784✔
1650

1651
  for (int32_t i = 0; i < dbNum; ++i) {
1,568✔
1652
    char* dbFName = taosArrayGet(pRequest->dbList, i);
784✔
1653

1654
    // catalogRefreshDBVgInfo will handle dbFName == null.
1655
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
784✔
1656
    if (code != TSDB_CODE_SUCCESS) {
784✔
UNCOV
1657
      return code;
×
1658
    }
1659
  }
1660

1661
  for (int32_t i = 0; i < tblNum; ++i) {
2,156✔
1662
    SName* tableName = taosArrayGet(pRequest->tableList, i);
1,568✔
1663

1664
    // catalogRefreshTableMeta will handle tableName == null.
1665
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
1,568✔
1666
    if (code != TSDB_CODE_SUCCESS) {
1,568✔
1667
      return code;
196✔
1668
    }
1669
  }
1670

1671
  return code;
588✔
1672
}
1673

1674
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
5,272,971✔
1675
  SCatalog* pCatalog = NULL;
5,272,971✔
1676
  int32_t   tbNum = taosArrayGetSize(tbList);
5,272,971✔
1677
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
5,272,971✔
1678
  if (code != TSDB_CODE_SUCCESS) {
5,272,971✔
UNCOV
1679
    return code;
×
1680
  }
1681

1682
  if (isView) {
5,272,971✔
1683
    for (int32_t i = 0; i < tbNum; ++i) {
714,006✔
1684
      SName* pViewName = taosArrayGet(tbList, i);
357,003✔
1685
      char   dbFName[TSDB_DB_FNAME_LEN];
353,473✔
1686
      if (NULL == pViewName) {
357,003✔
UNCOV
1687
        continue;
×
1688
      }
1689
      (void)tNameGetFullDbName(pViewName, dbFName);
357,003✔
1690
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
357,003✔
1691
    }
1692
  } else {
1693
    for (int32_t i = 0; i < tbNum; ++i) {
7,806,420✔
1694
      SName* pTbName = taosArrayGet(tbList, i);
2,890,452✔
1695
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
2,890,452✔
1696
    }
1697
  }
1698

1699
  return TSDB_CODE_SUCCESS;
5,272,971✔
1700
}
1701

1702
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
94,462,285✔
1703
  pEpSet->version = 0;
94,462,285✔
1704

1705
  // init mnode ip set
1706
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
94,462,425✔
1707
  mgmtEpSet->numOfEps = 0;
94,462,425✔
1708
  mgmtEpSet->inUse = 0;
94,462,425✔
1709

1710
  if (firstEp && firstEp[0] != 0) {
94,462,096✔
1711
    if (strlen(firstEp) >= TSDB_EP_LEN) {
94,465,187✔
UNCOV
1712
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1713
      return -1;
×
1714
    }
1715

1716
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
94,465,187✔
1717
    if (code != TSDB_CODE_SUCCESS) {
94,464,495✔
UNCOV
1718
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1719
      return terrno;
×
1720
    }
1721
    // uint32_t addr = 0;
1722
    SIpAddr addr = {0};
94,464,495✔
1723
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
94,464,914✔
1724
    if (code) {
94,462,574✔
1725
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
104✔
1726
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1727
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
118✔
1728
    } else {
1729
      mgmtEpSet->numOfEps++;
94,465,377✔
1730
    }
1731
  }
1732

1733
  if (secondEp && secondEp[0] != 0) {
94,462,393✔
1734
    if (strlen(secondEp) >= TSDB_EP_LEN) {
2,084,859✔
UNCOV
1735
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1736
      return terrno;
×
1737
    }
1738

1739
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,084,859✔
1740
    if (code != TSDB_CODE_SUCCESS) {
2,084,915✔
UNCOV
1741
      return code;
×
1742
    }
1743
    SIpAddr addr = {0};
2,084,915✔
1744
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
2,084,915✔
1745
    if (code) {
2,084,859✔
UNCOV
1746
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1747
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1748
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1749
    } else {
1750
      mgmtEpSet->numOfEps++;
2,084,859✔
1751
    }
1752
  }
1753

1754
  if (mgmtEpSet->numOfEps == 0) {
94,462,395✔
1755
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
118✔
1756
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
118✔
1757
  }
1758

1759
  return 0;
94,461,938✔
1760
}
1761

1762
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
94,463,674✔
1763
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1764
  *pTscObj = NULL;
94,463,674✔
1765
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
94,463,674✔
1766
  if (TSDB_CODE_SUCCESS != code) {
94,465,227✔
UNCOV
1767
    return code;
×
1768
  }
1769

1770
  SRequestObj* pRequest = NULL;
94,465,227✔
1771
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
94,465,227✔
1772
  if (TSDB_CODE_SUCCESS != code) {
94,465,423✔
UNCOV
1773
    destroyTscObj(*pTscObj);
×
1774
    return code;
×
1775
  }
1776

1777
  pRequest->sqlstr = taosStrdup("taos_connect");
94,465,423✔
1778
  if (pRequest->sqlstr) {
94,464,231✔
1779
    pRequest->sqlLen = strlen(pRequest->sqlstr);
94,464,231✔
1780
  } else {
1781
    return terrno;
×
1782
  }
1783

1784
  SMsgSendInfo* body = NULL;
94,464,231✔
1785
  code = buildConnectMsg(pRequest, &body, totpCode);
94,464,231✔
1786
  if (TSDB_CODE_SUCCESS != code) {
94,463,121✔
UNCOV
1787
    destroyTscObj(*pTscObj);
×
1788
    return code;
×
1789
  }
1790

1791
  // int64_t transporterId = 0;
1792
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
94,463,121✔
1793
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
94,463,369✔
1794
  if (TSDB_CODE_SUCCESS != code) {
94,461,535✔
1795
    destroyTscObj(*pTscObj);
×
UNCOV
1796
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
UNCOV
1797
    return code;
×
1798
  }
1799
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
94,461,535✔
1800
    destroyTscObj(*pTscObj);
385✔
UNCOV
1801
    tscError("failed to wait sem, code:%s", terrstr());
×
1802
    return terrno;
×
1803
  }
1804
  if (pRequest->code != TSDB_CODE_SUCCESS) {
94,462,167✔
1805
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
16,023✔
1806
    tscError("failed to connect to server, reason: %s", errorMsg);
16,023✔
1807

1808
    terrno = pRequest->code;
16,023✔
1809
    destroyRequest(pRequest);
16,023✔
1810
    taos_close_internal(*pTscObj);
16,023✔
1811
    *pTscObj = NULL;
16,023✔
1812
    return terrno;
16,023✔
1813
  }
1814
  if (connType == CONN_TYPE__AUTH_TEST) {
94,446,144✔
UNCOV
1815
    terrno = TSDB_CODE_SUCCESS;
×
UNCOV
1816
    destroyRequest(pRequest);
×
UNCOV
1817
    taos_close_internal(*pTscObj);
×
1818
    *pTscObj = NULL;
1,595✔
1819
    return TSDB_CODE_SUCCESS;
1,595✔
1820
  }
1821

1822
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
94,446,144✔
1823
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1824
  destroyRequest(pRequest);
94,446,301✔
1825
  return code;
94,445,851✔
1826
}
1827

1828
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
94,463,357✔
1829
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
94,463,357✔
1830
  if (*pMsgSendInfo == NULL) {
94,465,136✔
UNCOV
1831
    return terrno;
×
1832
  }
1833

1834
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
94,465,136✔
1835

1836
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
94,465,170✔
1837
  (*pMsgSendInfo)->requestId = pRequest->requestId;
94,465,170✔
1838
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
94,465,170✔
1839
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
94,464,810✔
1840
  if (NULL == (*pMsgSendInfo)->param) {
94,464,531✔
UNCOV
1841
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1842
    return terrno;
×
1843
  }
1844

1845
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
94,464,504✔
1846

1847
  SConnectReq connectReq = {0};
94,464,504✔
1848
  STscObj*    pObj = pRequest->pTscObj;
94,464,504✔
1849

1850
  char* db = getDbOfConnection(pObj);
94,464,504✔
1851
  if (db != NULL) {
94,464,255✔
1852
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
567,364✔
1853
  } else if (terrno) {
93,896,891✔
UNCOV
1854
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1855
    return terrno;
×
1856
  }
1857
  taosMemoryFreeClear(db);
94,465,566✔
1858

1859
  connectReq.connType = pObj->connType;
94,465,593✔
1860
  connectReq.pid = appInfo.pid;
94,465,593✔
1861
  connectReq.startTime = appInfo.startTime;
94,465,593✔
1862
  connectReq.totpCode = totpCode;
94,465,593✔
1863
  connectReq.connectTime = taosGetTimestampMs();
94,464,755✔
1864

1865
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
94,464,755✔
1866
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
94,464,728✔
1867
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
94,464,720✔
1868
  tstrncpy(connectReq.token, pObj->token, sizeof(connectReq.token));
94,464,701✔
1869
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
94,464,693✔
1870
  tSignConnectReq(&connectReq);
94,464,755✔
1871

1872
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
94,463,737✔
1873
  void*   pReq = taosMemoryMalloc(contLen);
94,462,655✔
1874
  if (NULL == pReq) {
94,464,050✔
UNCOV
1875
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1876
    return terrno;
×
1877
  }
1878

1879
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
94,464,050✔
1880
    taosMemoryFree(*pMsgSendInfo);
1,470✔
UNCOV
1881
    taosMemoryFree(pReq);
×
1882
    return terrno;
×
1883
  }
1884

1885
  (*pMsgSendInfo)->msgInfo.len = contLen;
94,460,266✔
1886
  (*pMsgSendInfo)->msgInfo.pData = pReq;
94,460,780✔
1887
  return TSDB_CODE_SUCCESS;
94,461,662✔
1888
}
1889

1890
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1891
  if (NULL == pEpSet) {
2,147,483,647✔
1892
    return;
2,147,483,647✔
1893
  }
1894

1895
  switch (pSendInfo->target.type) {
4,114,563✔
1896
    case TARGET_TYPE_MNODE:
91✔
1897
      if (NULL == pTscObj) {
91✔
UNCOV
1898
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1899
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1900
        return;
982✔
1901
      }
1902

1903
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
91✔
1904
      SEpSet* pOrig = &originEpset;
91✔
1905
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
91✔
1906
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
91✔
1907
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
91✔
1908
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1909
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
91✔
1910
      break;
571,128✔
1911
    case TARGET_TYPE_VNODE: {
3,873,377✔
1912
      if (NULL == pTscObj) {
3,873,443✔
1913
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
982✔
1914
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1915
        return;
982✔
1916
      }
1917

1918
      SCatalog* pCatalog = NULL;
3,872,461✔
1919
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
3,872,461✔
1920
      if (code != TSDB_CODE_SUCCESS) {
3,872,518✔
UNCOV
1921
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1922
                 tstrerror(code));
UNCOV
1923
        return;
×
1924
      }
1925

1926
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
3,872,518✔
1927
      if (code != TSDB_CODE_SUCCESS) {
3,872,581✔
1928
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
26✔
1929
                 tstrerror(code));
1930
        return;
×
1931
      }
1932
      taosMemoryFreeClear(pSendInfo->target.dbFName);
3,872,555✔
1933
      break;
3,872,555✔
1934
    }
1935
    default:
243,756✔
1936
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
243,756✔
1937
      break;
244,414✔
1938
  }
1939
}
1940

1941
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1942
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
2,147,483,647✔
1943
  if (pMsg->info.ahandle == NULL) {
2,147,483,647✔
1944
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
624,587✔
1945
    rpcFreeCont(pMsg->pCont);
624,587✔
1946
    taosMemoryFree(pEpSet);
624,587✔
1947
    return TSDB_CODE_TSC_INTERNAL_ERROR;
624,587✔
1948
  }
1949

1950
  STscObj* pTscObj = NULL;
2,147,483,647✔
1951

1952
  STraceId* trace = &pMsg->info.traceId;
2,147,483,647✔
1953
  char      tbuf[40] = {0};
2,147,483,647✔
1954
  TRACE_TO_STR(trace, tbuf);
2,147,483,647✔
1955

1956
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
2,147,483,647✔
1957
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1958

1959
  if (pSendInfo->requestObjRefId != 0) {
2,147,483,647✔
1960
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,879,549,384✔
1961
    if (pRequest) {
1,879,544,054✔
1962
      if (pRequest->self != pSendInfo->requestObjRefId) {
1,864,268,674✔
UNCOV
1963
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1964
                 pSendInfo->requestObjRefId);
1965

UNCOV
1966
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
UNCOV
1967
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1968
        }
UNCOV
1969
        rpcFreeCont(pMsg->pCont);
×
1970
        taosMemoryFree(pEpSet);
×
UNCOV
1971
        destroySendMsgInfo(pSendInfo);
×
UNCOV
1972
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1973
      }
1974
      pTscObj = pRequest->pTscObj;
1,864,271,364✔
1975
    }
1976
  }
1977

1978
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
2,147,483,647✔
1979

1980
  SDataBuf buf = {.msgType = pMsg->msgType,
2,147,483,647✔
1981
                  .len = pMsg->contLen,
2,147,483,647✔
1982
                  .pData = NULL,
1983
                  .handle = pMsg->info.handle,
2,147,483,647✔
1984
                  .handleRefId = pMsg->info.refId,
2,147,483,647✔
1985
                  .pEpSet = pEpSet};
1986

1987
  if (pMsg->contLen > 0) {
2,147,483,647✔
1988
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
2,147,483,647✔
1989
    if (buf.pData == NULL) {
2,147,483,647✔
UNCOV
1990
      pMsg->code = terrno;
×
1991
    } else {
1992
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
2,147,483,647✔
1993
    }
1994
  }
1995

1996
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
2,147,483,647✔
1997

1998
  if (pTscObj) {
2,147,483,647✔
1999
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
1,864,220,129✔
2000
    if (TSDB_CODE_SUCCESS != code) {
1,864,278,214✔
2001
      tscError("doProcessMsgFromServer taosReleaseRef failed");
150✔
2002
      terrno = code;
150✔
2003
      pMsg->code = code;
150✔
2004
    }
2005
  }
2006

2007
  rpcFreeCont(pMsg->pCont);
2,147,483,647✔
2008
  destroySendMsgInfo(pSendInfo);
2,147,483,647✔
2009
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
2010
}
2011

2012
int32_t doProcessMsgFromServer(void* param) {
2,147,483,647✔
2013
  AsyncArg* arg = (AsyncArg*)param;
2,147,483,647✔
2014
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
2,147,483,647✔
2015
  taosMemoryFree(arg);
2,147,483,647✔
2016
  return code;
2,147,483,647✔
2017
}
2018

2019
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
2020
  int32_t code = 0;
2,147,483,647✔
2021
  SEpSet* tEpSet = NULL;
2,147,483,647✔
2022

2023
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
2,147,483,647✔
2024

2025
  if (pEpSet != NULL) {
2,147,483,647✔
2026
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
4,118,013✔
2027
    if (NULL == tEpSet) {
4,117,887✔
UNCOV
2028
      code = terrno;
×
UNCOV
2029
      pMsg->code = terrno;
×
UNCOV
2030
      goto _exit;
×
2031
    }
2032
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
4,117,887✔
2033
  }
2034

2035
  // pMsg is response msg
2036
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
2,147,483,647✔
2037
    // restore origin code
2038
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
94,407,498✔
UNCOV
2039
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
2040
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
94,407,498✔
UNCOV
2041
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
2042
    }
2043
  } else {
2044
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
2045
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
2,147,483,647✔
2046
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
2047
    }
2048
  }
2049

2050
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
2,147,483,647✔
2051
  if (NULL == arg) {
2,147,483,647✔
UNCOV
2052
    code = terrno;
×
2053
    pMsg->code = code;
×
UNCOV
2054
    goto _exit;
×
2055
  }
2056

2057
  arg->msg = *pMsg;
2,147,483,647✔
2058
  arg->pEpset = tEpSet;
2,147,483,647✔
2059

2060
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
2,147,483,647✔
2061
    pMsg->code = code;
389,570✔
2062
    taosMemoryFree(arg);
389,570✔
2063
    goto _exit;
309,962✔
2064
  }
2065
  return;
2,147,483,647✔
2066

2067
_exit:
309,962✔
2068
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
309,962✔
2069
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
309,962✔
2070
  if (code != 0) {
309,962✔
2071
    tscError("failed to sched msg to tsc, tsc ready quit");
578✔
2072
  }
2073
}
2074

2075
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
2,137✔
2076
                        uint16_t port) {
2077
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
2,137✔
2078
  if (user == NULL) {
2,137✔
UNCOV
2079
    user = TSDB_DEFAULT_USER;
×
2080
  }
2081

2082
  if (pass == NULL) {
2,137✔
UNCOV
2083
    pass = TSDB_DEFAULT_PASS;
×
2084
  }
2085

2086
  STscObj* pObj = NULL;
2,137✔
2087
  int32_t  code = taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__QUERY, &pObj);
2,137✔
2088
  if (TSDB_CODE_SUCCESS == code) {
2,137✔
2089
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,445✔
2090
    if (NULL == rid) {
1,445✔
UNCOV
2091
      tscError("out of memory when taos_connect_totp to %s:%u, user:%s db:%s", ip, port, user, db);
×
UNCOV
2092
      return NULL;
×
2093
    }
2094
    *rid = pObj->id;
1,445✔
2095
    return (TAOS*)rid;
1,445✔
2096
  } else {
2097
    terrno = code;
692✔
2098
  }
2099

2100
  return NULL;
692✔
2101
}
2102

UNCOV
2103
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
×
2104
                      uint16_t port) {
UNCOV
2105
  tscInfo("try to test connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
UNCOV
2106
  if (user == NULL) {
×
UNCOV
2107
    user = TSDB_DEFAULT_USER;
×
2108
  }
2109

UNCOV
2110
  if (pass == NULL) {
×
UNCOV
2111
    pass = TSDB_DEFAULT_PASS;
×
2112
  }
2113

2114
  STscObj* pObj = NULL;
×
UNCOV
2115
  return taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
×
2116
}
2117

2118
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) {
2,494✔
2119
  tscInfo("try to connect to %s:%u by token, db:%s", ip, port, db);
2,494✔
2120

2121
  STscObj* pObj = NULL;
2,494✔
2122
  int32_t  code = taos_connect_by_auth(ip, NULL, token, NULL, db, port, CONN_TYPE__QUERY, &pObj);
2,494✔
2123
  if (TSDB_CODE_SUCCESS == code) {
2,494✔
2124
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,157✔
2125
    if (NULL == rid) {
1,157✔
UNCOV
2126
      tscError("out of memory when taos_connect_token to %s:%u db:%s", ip, port, db);
×
UNCOV
2127
      return NULL;
×
2128
    }
2129
    *rid = pObj->id;
1,157✔
2130
    return (TAOS*)rid;
1,157✔
2131
  } else {
2132
    terrno = code;
1,337✔
2133
  }
2134

2135
  return NULL;
1,337✔
2136
}
2137

2138
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
100✔
2139
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
100✔
2140
  if (user == NULL) {
100✔
UNCOV
2141
    user = TSDB_DEFAULT_USER;
×
2142
  }
2143

2144
  if (auth == NULL) {
100✔
UNCOV
2145
    tscError("No auth info is given, failed to connect to server");
×
UNCOV
2146
    return NULL;
×
2147
  }
2148

2149
  STscObj* pObj = NULL;
100✔
2150
  int32_t  code = taos_connect_by_auth(ip, user, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
100✔
2151
  if (TSDB_CODE_SUCCESS == code) {
100✔
2152
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
×
2153
    if (NULL == rid) {
×
UNCOV
2154
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2155
    }
UNCOV
2156
    *rid = pObj->id;
×
UNCOV
2157
    return (TAOS*)rid;
×
2158
  }
2159

2160
  return NULL;
100✔
2161
}
2162

2163
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2164
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2165
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2166

2167
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2168
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2169

2170
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2171
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2172
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2173

2174
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
2175
          pResultInfo->length[i] = blobDataLen(pStart);
17,913✔
2176
          pResultInfo->row[i] = blobDataVal(pStart);
483✔
2177
        } else {
2178
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2179
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2180
        }
2181
      } else {
2182
        pResultInfo->row[i] = NULL;
284,060,237✔
2183
        pResultInfo->length[i] = 0;
284,186,350✔
2184
      }
2185
    } else {
2186
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2187
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2188
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2189
      } else {
2190
        pResultInfo->row[i] = NULL;
1,215,137,171✔
2191
        pResultInfo->length[i] = 0;
1,215,474,643✔
2192
      }
2193
    }
2194
  }
2195
}
2,147,483,647✔
2196

UNCOV
2197
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
UNCOV
2198
  if (pRequest == NULL) {
×
UNCOV
2199
    return NULL;
×
2200
  }
2201

UNCOV
2202
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
UNCOV
2203
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2204
    // All data has returned to App already, no need to try again
2205
    if (pResultInfo->completed) {
×
2206
      pResultInfo->numOfRows = 0;
×
UNCOV
2207
      return NULL;
×
2208
    }
2209

2210
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
UNCOV
2211
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2212

2213
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2214
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
UNCOV
2215
      pResultInfo->numOfRows = 0;
×
UNCOV
2216
      return NULL;
×
2217
    }
2218

UNCOV
2219
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2220
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2221
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2222
      pResultInfo->numOfRows = 0;
×
2223
      return NULL;
×
2224
    }
2225

2226
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2227
             ", complete:%d, QID:0x%" PRIx64,
2228
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2229

2230
    STscObj*            pTscObj = pRequest->pTscObj;
×
UNCOV
2231
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
UNCOV
2232
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2233

UNCOV
2234
    if (pResultInfo->numOfRows == 0) {
×
UNCOV
2235
      return NULL;
×
2236
    }
2237
  }
2238

2239
  if (setupOneRowPtr) {
×
UNCOV
2240
    doSetOneRowPtr(pResultInfo);
×
2241
    pResultInfo->current += 1;
×
2242
  }
2243

UNCOV
2244
  return pResultInfo->row;
×
2245
}
2246

2247
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
236,447,756✔
2248
  tsem_t* sem = param;
236,447,756✔
2249
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
236,447,756✔
UNCOV
2250
    tscError("failed to post sem, code:%s", terrstr());
×
2251
  }
2252
}
236,447,732✔
2253

2254
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,737,912,115✔
2255
  if (pRequest == NULL) {
1,737,912,115✔
UNCOV
2256
    return NULL;
×
2257
  }
2258

2259
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,737,912,115✔
2260
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,737,919,722✔
2261
    // All data has returned to App already, no need to try again
2262
    if (pResultInfo->completed) {
333,780,708✔
2263
      pResultInfo->numOfRows = 0;
97,332,555✔
2264
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
194,665,154✔
2265
      return NULL;
97,332,596✔
2266
    }
2267

2268
    // convert ucs4 to native multi-bytes string
2269
    pResultInfo->convertUcs4 = convertUcs4;
236,447,756✔
2270
    tsem_t sem;
235,821,060✔
2271
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
236,447,756✔
UNCOV
2272
      tscError("failed to init sem, code:%s", terrstr());
×
2273
    }
2274
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
236,447,676✔
2275
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
236,447,759✔
UNCOV
2276
      tscError("failed to wait sem, code:%s", terrstr());
×
2277
    }
2278
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
236,447,759✔
2279
      tscError("failed to destroy sem, code:%s", terrstr());
×
2280
    }
2281
    pRequest->inCallback = false;
236,447,759✔
2282
  }
2283

2284
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,640,594,603✔
2285
    return NULL;
11,284,451✔
2286
  } else {
2287
    if (setupOneRowPtr) {
1,629,309,484✔
2288
      doSetOneRowPtr(pResultInfo);
1,402,781,909✔
2289
      pResultInfo->current += 1;
1,402,782,465✔
2290
    }
2291

2292
    return pResultInfo->row;
1,629,305,946✔
2293
  }
2294
}
2295

2296
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
345,892,181✔
2297
  if (pResInfo->row == NULL) {
345,892,181✔
2298
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
214,077,274✔
2299
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
214,073,314✔
2300
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
214,072,132✔
2301
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
214,075,866✔
2302

2303
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
214,075,902✔
2304
      taosMemoryFree(pResInfo->row);
8,237✔
UNCOV
2305
      taosMemoryFree(pResInfo->pCol);
×
UNCOV
2306
      taosMemoryFree(pResInfo->length);
×
UNCOV
2307
      taosMemoryFree(pResInfo->convertBuf);
×
UNCOV
2308
      return terrno;
×
2309
    }
2310
  }
2311

2312
  return TSDB_CODE_SUCCESS;
345,894,175✔
2313
}
2314

2315
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
345,794,199✔
2316
  int32_t idx = -1;
345,794,199✔
2317
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
345,794,573✔
2318
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
345,792,723✔
2319

2320
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,962,089,267✔
2321
    int32_t type = pResultInfo->fields[i].type;
1,616,311,776✔
2322
    int32_t schemaBytes =
2323
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,616,312,545✔
2324

2325
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1,616,310,883✔
2326
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
95,397,434✔
2327
      if (p == NULL) {
95,397,434✔
UNCOV
2328
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
UNCOV
2329
        return terrno;
×
2330
      }
2331

2332
      pResultInfo->convertBuf[i] = p;
95,397,434✔
2333

2334
      SResultColumn* pCol = &pResultInfo->pCol[i];
95,397,434✔
2335
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2336
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2337
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2338

2339
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2340
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2341
            tscError(
173✔
2342
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2343
                "colLength[i]):%p",
2344
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2345
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
173✔
2346
            return TSDB_CODE_TSC_INTERNAL_ERROR;
65✔
2347
          }
2348

2349
          varDataSetLen(p, len);
2,147,483,647✔
2350
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2351
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2352
        }
2353
      }
2354

2355
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
95,397,369✔
2356
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
95,397,369✔
2357
    }
2358
  }
2359
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
345,795,508✔
2360
  return TSDB_CODE_SUCCESS;
345,795,529✔
2361
}
2362

2363
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
345,790,121✔
2364
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,962,081,078✔
2365
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
1,616,304,752✔
2366
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
1,616,306,098✔
2367
    int32_t       type = pFieldE->type;
1,616,305,102✔
2368
    int32_t       bufLen = 0;
1,616,303,465✔
2369
    char*         p = NULL;
1,616,303,465✔
2370
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
1,616,303,465✔
2371
      continue;
1,614,496,491✔
2372
    } else {
2373
      bufLen = 64;
1,799,742✔
2374
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,799,742✔
2375
      pFieldE->bytes = bufLen;
1,799,742✔
2376
      pField->bytes = bufLen;
1,799,742✔
2377
    }
2378
    if (!p) return terrno;
1,799,742✔
2379
    pResultInfo->convertBuf[i] = p;
1,799,742✔
2380

2381
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,030,869,809✔
2382
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
1,029,070,067✔
2383
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
1,029,070,067✔
2384
      p += bufLen;
1,029,070,067✔
2385
      if (TSDB_CODE_SUCCESS != code) {
1,029,070,067✔
UNCOV
2386
        return code;
×
2387
      }
2388
    }
2389
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,799,742✔
2390
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,799,742✔
2391
  }
2392
  return 0;
345,792,788✔
2393
}
2394

2395
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
418,268✔
2396
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
836,144✔
2397
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
417,876✔
2398
}
2399

2400
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
209,134✔
2401
  char*   p = (char*)pResultInfo->pData;
209,134✔
2402
  int32_t blockVersion = *(int32_t*)p;
209,134✔
2403

2404
  int32_t numOfRows = pResultInfo->numOfRows;
209,134✔
2405
  int32_t numOfCols = pResultInfo->numOfCols;
209,134✔
2406

2407
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2408
  // length |
2409
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
209,134✔
2410
  if (numOfCols != cols) {
209,134✔
UNCOV
2411
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2412
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2413
  }
2414

2415
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
209,134✔
2416
  int32_t* colLength = (int32_t*)(p + len);
209,134✔
2417
  len += sizeof(int32_t) * numOfCols;
209,134✔
2418

2419
  char* pStart = p + len;
209,134✔
2420
  for (int32_t i = 0; i < numOfCols; ++i) {
891,354✔
2421
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
682,220✔
2422

2423
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
682,220✔
2424
      int32_t* offset = (int32_t*)pStart;
242,180✔
2425
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
242,180✔
2426
      len += lenTmp;
242,180✔
2427
      pStart += lenTmp;
242,180✔
2428

2429
      int32_t estimateColLen = 0;
242,180✔
2430
      for (int32_t j = 0; j < numOfRows; ++j) {
1,159,296✔
2431
        if (offset[j] == -1) {
917,116✔
2432
          continue;
51,112✔
2433
        }
2434
        char* data = offset[j] + pStart;
866,004✔
2435

2436
        int32_t jsonInnerType = *data;
866,004✔
2437
        char*   jsonInnerData = data + CHAR_BYTES;
866,004✔
2438
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
866,004✔
2439
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
12,000✔
2440
        } else if (tTagIsJson(data)) {
854,004✔
2441
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
213,188✔
2442
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
640,816✔
2443
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
595,816✔
2444
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
45,000✔
2445
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
33,000✔
2446
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
12,000✔
2447
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
12,000✔
UNCOV
2448
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
UNCOV
2449
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2450
        } else {
UNCOV
2451
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
UNCOV
2452
          return -1;
×
2453
        }
2454
      }
2455
      len += TMAX(colLen, estimateColLen);
242,180✔
2456
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
440,040✔
2457
      int32_t lenTmp = numOfRows * sizeof(int32_t);
60,000✔
2458
      len += (lenTmp + colLen);
60,000✔
2459
      pStart += lenTmp;
60,000✔
2460
    } else {
2461
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
380,040✔
2462
      len += (lenTmp + colLen);
380,040✔
2463
      pStart += lenTmp;
380,040✔
2464
    }
2465
    pStart += colLen;
682,220✔
2466
  }
2467

2468
  // Ensure the complete structure of the block, including the blankfill field,
2469
  // even though it is not used on the client side.
2470
  len += sizeof(bool);
209,134✔
2471
  return len;
209,134✔
2472
}
2473

2474
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
345,889,708✔
2475
  int32_t numOfRows = pResultInfo->numOfRows;
345,889,708✔
2476
  int32_t numOfCols = pResultInfo->numOfCols;
345,890,082✔
2477
  bool    needConvert = false;
345,891,098✔
2478
  for (int32_t i = 0; i < numOfCols; ++i) {
1,962,191,391✔
2479
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
1,616,507,466✔
2480
      needConvert = true;
209,134✔
2481
      break;
209,134✔
2482
    }
2483
  }
2484

2485
  if (!needConvert) {
345,893,059✔
2486
    return TSDB_CODE_SUCCESS;
345,683,925✔
2487
  }
2488

2489
  tscDebug("start to convert form json format string");
209,134✔
2490

2491
  char*   p = (char*)pResultInfo->pData;
209,134✔
2492
  int32_t blockVersion = *(int32_t*)p;
209,134✔
2493
  int32_t dataLen = estimateJsonLen(pResultInfo);
209,134✔
2494
  if (dataLen <= 0) {
209,134✔
UNCOV
2495
    tscError("doConvertJson error: estimateJsonLen failed");
×
UNCOV
2496
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2497
  }
2498

2499
  taosMemoryFreeClear(pResultInfo->convertJson);
209,134✔
2500
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
209,134✔
2501
  if (pResultInfo->convertJson == NULL) return terrno;
209,134✔
2502
  char* p1 = pResultInfo->convertJson;
209,134✔
2503

2504
  int32_t totalLen = 0;
209,134✔
2505
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
209,134✔
2506
  if (numOfCols != cols) {
209,134✔
UNCOV
2507
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2508
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2509
  }
2510

2511
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
209,134✔
2512
  (void)memcpy(p1, p, len);
209,134✔
2513

2514
  p += len;
209,134✔
2515
  p1 += len;
209,134✔
2516
  totalLen += len;
209,134✔
2517

2518
  len = sizeof(int32_t) * numOfCols;
209,134✔
2519
  int32_t* colLength = (int32_t*)p;
209,134✔
2520
  int32_t* colLength1 = (int32_t*)p1;
209,134✔
2521
  (void)memcpy(p1, p, len);
209,134✔
2522
  p += len;
209,134✔
2523
  p1 += len;
209,134✔
2524
  totalLen += len;
209,134✔
2525

2526
  char* pStart = p;
209,134✔
2527
  char* pStart1 = p1;
209,134✔
2528
  for (int32_t i = 0; i < numOfCols; ++i) {
891,354✔
2529
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
682,220✔
2530
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
682,220✔
2531
    if (colLen >= dataLen) {
682,220✔
UNCOV
2532
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
UNCOV
2533
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2534
    }
2535
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
682,220✔
2536
      int32_t* offset = (int32_t*)pStart;
242,180✔
2537
      int32_t* offset1 = (int32_t*)pStart1;
242,180✔
2538
      len = numOfRows * sizeof(int32_t);
242,180✔
2539
      (void)memcpy(pStart1, pStart, len);
242,180✔
2540
      pStart += len;
242,180✔
2541
      pStart1 += len;
242,180✔
2542
      totalLen += len;
242,180✔
2543

2544
      len = 0;
242,180✔
2545
      for (int32_t j = 0; j < numOfRows; ++j) {
1,159,296✔
2546
        if (offset[j] == -1) {
917,116✔
2547
          continue;
51,112✔
2548
        }
2549
        char* data = offset[j] + pStart;
866,004✔
2550

2551
        int32_t jsonInnerType = *data;
866,004✔
2552
        char*   jsonInnerData = data + CHAR_BYTES;
866,004✔
2553
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
866,004✔
2554
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
866,004✔
2555
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
12,000✔
2556
          varDataSetLen(dst, strlen(varDataVal(dst)));
12,000✔
2557
        } else if (tTagIsJson(data)) {
854,004✔
2558
          char* jsonString = NULL;
213,188✔
2559
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
213,188✔
2560
          if (jsonString == NULL) {
213,188✔
UNCOV
2561
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
UNCOV
2562
            return terrno;
×
2563
          }
2564
          STR_TO_VARSTR(dst, jsonString);
213,188✔
2565
          taosMemoryFree(jsonString);
213,188✔
2566
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
640,816✔
2567
          *(char*)varDataVal(dst) = '\"';
595,816✔
2568
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
595,816✔
2569
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
595,816✔
2570
                                         pResultInfo->charsetCxt);
2571
          if (length <= 0) {
595,816✔
2572
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
500✔
2573
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2574
            length = 0;
500✔
2575
          }
2576
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
595,816✔
2577
                                                 varDataVal(tmp), length);
2578
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
595,816✔
2579
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
595,816✔
2580
          tscError("value:%s.", varDataVal(dst));
595,816✔
2581
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
45,000✔
2582
          double jsonVd = *(double*)(jsonInnerData);
33,000✔
2583
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
33,000✔
2584
          varDataSetLen(dst, strlen(varDataVal(dst)));
33,000✔
2585
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
12,000✔
2586
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
12,000✔
2587
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
12,000✔
2588
          varDataSetLen(dst, strlen(varDataVal(dst)));
12,000✔
2589
        } else {
UNCOV
2590
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
UNCOV
2591
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2592
        }
2593

2594
        offset1[j] = len;
866,004✔
2595
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
866,004✔
2596
        len += varDataTLen(dst);
866,004✔
2597
      }
2598
      colLen1 = len;
242,180✔
2599
      totalLen += colLen1;
242,180✔
2600
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
242,180✔
2601
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
440,040✔
2602
      len = numOfRows * sizeof(int32_t);
60,000✔
2603
      (void)memcpy(pStart1, pStart, len);
60,000✔
2604
      pStart += len;
60,000✔
2605
      pStart1 += len;
60,000✔
2606
      totalLen += len;
60,000✔
2607
      totalLen += colLen;
60,000✔
2608
      (void)memcpy(pStart1, pStart, colLen);
60,000✔
2609
    } else {
2610
      len = BitmapLen(pResultInfo->numOfRows);
380,040✔
2611
      (void)memcpy(pStart1, pStart, len);
380,040✔
2612
      pStart += len;
380,040✔
2613
      pStart1 += len;
380,040✔
2614
      totalLen += len;
380,040✔
2615
      totalLen += colLen;
380,040✔
2616
      (void)memcpy(pStart1, pStart, colLen);
380,040✔
2617
    }
2618
    pStart += colLen;
682,220✔
2619
    pStart1 += colLen1;
682,220✔
2620
  }
2621

2622
  // Ensure the complete structure of the block, including the blankfill field,
2623
  // even though it is not used on the client side.
2624
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2625
  totalLen += sizeof(bool);
209,134✔
2626

2627
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
209,134✔
2628
  pResultInfo->pData = pResultInfo->convertJson;
209,134✔
2629
  return TSDB_CODE_SUCCESS;
209,134✔
2630
}
2631

2632
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
370,766,421✔
2633
  bool convertForDecimal = convertUcs4;
370,766,421✔
2634
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
370,766,421✔
2635
    tscError("setResultDataPtr paras error");
364✔
UNCOV
2636
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2637
  }
2638

2639
  if (pResultInfo->numOfRows == 0) {
370,764,868✔
2640
    return TSDB_CODE_SUCCESS;
24,870,720✔
2641
  }
2642

2643
  if (pResultInfo->pData == NULL) {
345,895,671✔
UNCOV
2644
    tscError("setResultDataPtr error: pData is NULL");
×
UNCOV
2645
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2646
  }
2647

2648
  int32_t code = doPrepareResPtr(pResultInfo);
345,894,339✔
2649
  if (code != TSDB_CODE_SUCCESS) {
345,894,522✔
UNCOV
2650
    return code;
×
2651
  }
2652
  code = doConvertJson(pResultInfo);
345,894,522✔
2653
  if (code != TSDB_CODE_SUCCESS) {
345,888,194✔
UNCOV
2654
    return code;
×
2655
  }
2656

2657
  char* p = (char*)pResultInfo->pData;
345,888,194✔
2658

2659
  // version:
2660
  int32_t blockVersion = *(int32_t*)p;
345,888,942✔
2661
  p += sizeof(int32_t);
345,892,641✔
2662

2663
  int32_t dataLen = *(int32_t*)p;
345,892,267✔
2664
  p += sizeof(int32_t);
345,891,893✔
2665

2666
  int32_t rows = *(int32_t*)p;
345,893,053✔
2667
  p += sizeof(int32_t);
345,892,540✔
2668

2669
  int32_t cols = *(int32_t*)p;
345,891,789✔
2670
  p += sizeof(int32_t);
345,893,662✔
2671

2672
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
345,892,507✔
2673
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
4,889✔
2674
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
UNCOV
2675
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2676
  }
2677

2678
  int32_t hasColumnSeg = *(int32_t*)p;
345,889,859✔
2679
  p += sizeof(int32_t);
345,892,537✔
2680

2681
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
345,894,579✔
2682
  p += sizeof(uint64_t);
345,894,579✔
2683

2684
  // check fields
2685
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,962,452,740✔
2686
    int8_t type = *(int8_t*)p;
1,616,568,559✔
2687
    p += sizeof(int8_t);
1,616,559,963✔
2688

2689
    int32_t bytes = *(int32_t*)p;
1,616,567,790✔
2690
    p += sizeof(int32_t);
1,616,565,742✔
2691

2692
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
1,616,565,822✔
2693
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
37,440✔
2694
    }
2695
  }
2696

2697
  int32_t* colLength = (int32_t*)p;
345,895,125✔
2698
  p += sizeof(int32_t) * pResultInfo->numOfCols;
345,895,125✔
2699

2700
  char* pStart = p;
345,894,030✔
2701
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
1,962,493,233✔
2702
    if ((pStart - pResultInfo->pData) >= dataLen) {
1,616,598,313✔
UNCOV
2703
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
UNCOV
2704
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2705
    }
2706
    if (blockVersion == BLOCK_VERSION_1) {
1,616,590,904✔
2707
      colLength[i] = htonl(colLength[i]);
1,165,178,726✔
2708
    }
2709
    if (colLength[i] >= dataLen) {
1,616,591,524✔
2710
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2711
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2712
    }
2713
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
1,616,590,037✔
2714
      tscError("invalid type %d", pResultInfo->fields[i].type);
1,760✔
UNCOV
2715
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2716
    }
2717
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,616,593,513✔
2718
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
343,252,911✔
2719
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
343,251,083✔
2720
    } else {
2721
      pResultInfo->pCol[i].nullbitmap = pStart;
1,273,341,858✔
2722
      pStart += BitmapLen(pResultInfo->numOfRows);
1,273,352,550✔
2723
    }
2724

2725
    pResultInfo->pCol[i].pData = pStart;
1,616,600,789✔
2726
    pResultInfo->length[i] =
2,147,483,647✔
2727
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2728
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,616,603,716✔
2729

2730
    pStart += colLength[i];
1,616,602,542✔
2731
  }
2732

2733
  p = pStart;
345,897,159✔
2734
  // bool blankFill = *(bool*)p;
2735
  p += sizeof(bool);
345,897,159✔
2736
  int32_t offset = p - pResultInfo->pData;
345,897,159✔
2737
  if (offset > dataLen) {
345,894,366✔
UNCOV
2738
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
UNCOV
2739
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2740
  }
2741

2742
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2743
  if (convertUcs4) {
345,894,366✔
2744
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
345,794,997✔
2745
  }
2746
#endif
2747
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
345,894,963✔
2748
    code = convertDecimalType(pResultInfo);
345,795,158✔
2749
  }
2750
  return code;
345,892,589✔
2751
}
2752

2753
char* getDbOfConnection(STscObj* pObj) {
1,280,963,645✔
2754
  terrno = TSDB_CODE_SUCCESS;
1,280,963,645✔
2755
  char* p = NULL;
1,281,007,757✔
2756
  (void)taosThreadMutexLock(&pObj->mutex);
1,281,007,757✔
2757
  size_t len = strlen(pObj->db);
1,281,022,349✔
2758
  if (len > 0) {
1,281,025,950✔
2759
    p = taosStrndup(pObj->db, tListLen(pObj->db));
890,757,086✔
2760
    if (p == NULL) {
890,759,302✔
UNCOV
2761
      tscError("failed to taosStrndup db name");
×
2762
    }
2763
  }
2764

2765
  (void)taosThreadMutexUnlock(&pObj->mutex);
1,281,028,166✔
2766
  return p;
1,280,973,687✔
2767
}
2768

2769
void setConnectionDB(STscObj* pTscObj, const char* db) {
94,073,507✔
2770
  if (db == NULL || pTscObj == NULL) {
94,073,507✔
UNCOV
2771
    tscError("setConnectionDB para is NULL");
×
UNCOV
2772
    return;
×
2773
  }
2774

2775
  (void)taosThreadMutexLock(&pTscObj->mutex);
94,113,957✔
2776
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
94,129,166✔
2777
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
94,129,137✔
2778
}
2779

UNCOV
2780
void resetConnectDB(STscObj* pTscObj) {
×
UNCOV
2781
  if (pTscObj == NULL) {
×
UNCOV
2782
    return;
×
2783
  }
2784

UNCOV
2785
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
UNCOV
2786
  pTscObj->db[0] = 0;
×
2787
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2788
}
2789

2790
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
288,702,373✔
2791
                              bool isStmt) {
2792
  if (pResultInfo == NULL || pRsp == NULL) {
288,702,373✔
2793
    tscError("setQueryResultFromRsp paras is null");
6✔
2794
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2795
  }
2796

2797
  taosMemoryFreeClear(pResultInfo->pRspMsg);
288,702,367✔
2798
  pResultInfo->pRspMsg = (const char*)pRsp;
288,702,330✔
2799
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
288,702,373✔
2800
  pResultInfo->current = 0;
288,702,339✔
2801
  pResultInfo->completed = (pRsp->completed == 1);
288,702,373✔
2802
  pResultInfo->precision = pRsp->precision;
288,702,255✔
2803

2804
  // decompress data if needed
2805
  int32_t payloadLen = htonl(pRsp->payloadLen);
288,702,276✔
2806

2807
  if (pRsp->compressed) {
288,702,259✔
UNCOV
2808
    if (pResultInfo->decompBuf == NULL) {
×
UNCOV
2809
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
UNCOV
2810
      if (pResultInfo->decompBuf == NULL) {
×
UNCOV
2811
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2812
        return terrno;
×
2813
      }
UNCOV
2814
      pResultInfo->decompBufSize = payloadLen;
×
2815
    } else {
2816
      if (pResultInfo->decompBufSize < payloadLen) {
×
2817
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2818
        if (p == NULL) {
×
2819
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2820
          return terrno;
×
2821
        }
2822

2823
        pResultInfo->decompBuf = p;
×
2824
        pResultInfo->decompBufSize = payloadLen;
×
2825
      }
2826
    }
2827
  }
2828

2829
  if (payloadLen > 0) {
288,702,333✔
2830
    int32_t compLen = *(int32_t*)pRsp->data;
263,832,062✔
2831
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
263,832,062✔
2832

2833
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
263,832,035✔
2834

2835
    if (pRsp->compressed && compLen < rawLen) {
263,832,035✔
UNCOV
2836
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
UNCOV
2837
      if (len < 0) {
×
UNCOV
2838
        tscError("tsDecompressString failed");
×
UNCOV
2839
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2840
      }
UNCOV
2841
      if (len != rawLen) {
×
UNCOV
2842
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2843
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2844
      }
2845
      pResultInfo->pData = pResultInfo->decompBuf;
×
2846
      pResultInfo->payloadLen = rawLen;
×
2847
    } else {
2848
      pResultInfo->pData = pStart;
263,831,908✔
2849
      pResultInfo->payloadLen = htonl(pRsp->compLen);
263,832,059✔
2850
      if (pRsp->compLen != pRsp->payloadLen) {
263,832,035✔
UNCOV
2851
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2852
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2853
      }
2854
    }
2855
  }
2856

2857
  // TODO handle the compressed case
2858
  pResultInfo->totalRows += pResultInfo->numOfRows;
288,702,297✔
2859

2860
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
288,702,333✔
2861
  return code;
288,701,128✔
2862
}
2863

2864
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
201✔
2865
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
201✔
2866
  void*              clientRpc = NULL;
201✔
2867
  SServerStatusRsp   statusRsp = {0};
201✔
2868
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
201✔
2869
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
201✔
2870
  SRpcMsg  rpcRsp = {0};
201✔
2871
  SRpcInit rpcInit = {0};
201✔
2872
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
201✔
2873

2874
  rpcInit.label = "CHK";
201✔
2875
  rpcInit.numOfThreads = 1;
201✔
2876
  rpcInit.cfp = NULL;
201✔
2877
  rpcInit.sessions = 16;
201✔
2878
  rpcInit.connType = TAOS_CONN_CLIENT;
201✔
2879
  rpcInit.idleTime = tsShellActivityTimer * 1000;
201✔
2880
  rpcInit.compressSize = tsCompressMsgSize;
201✔
2881
  rpcInit.user = "_dnd";
201✔
2882

2883
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
201✔
2884
  connLimitNum = TMAX(connLimitNum, 10);
201✔
2885
  connLimitNum = TMIN(connLimitNum, 500);
201✔
2886
  rpcInit.connLimitNum = connLimitNum;
201✔
2887
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
201✔
2888
  rpcInit.readTimeout = tsReadTimeout;
201✔
2889
  rpcInit.ipv6 = tsEnableIpv6;
201✔
2890
  rpcInit.enableSSL = tsEnableTLS;
201✔
2891

2892
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
201✔
2893
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
201✔
2894
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
201✔
2895
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
201✔
2896
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
201✔
2897

2898
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
201✔
UNCOV
2899
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
UNCOV
2900
    goto _OVER;
×
2901
  }
2902

2903
  clientRpc = rpcOpen(&rpcInit);
201✔
2904
  if (clientRpc == NULL) {
201✔
UNCOV
2905
    code = terrno;
×
2906
    tscError("failed to init server status client since %s", tstrerror(code));
×
2907
    goto _OVER;
×
2908
  }
2909

2910
  if (fqdn == NULL) {
201✔
2911
    fqdn = tsLocalFqdn;
201✔
2912
  }
2913

2914
  if (port == 0) {
201✔
2915
    port = tsServerPort;
201✔
2916
  }
2917

2918
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
201✔
2919
  epSet.eps[0].port = (uint16_t)port;
201✔
2920
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
201✔
2921
  if (TSDB_CODE_SUCCESS != ret) {
201✔
UNCOV
2922
    tscError("failed to send recv since %s", tstrerror(ret));
×
UNCOV
2923
    goto _OVER;
×
2924
  }
2925

2926
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
201✔
2927
    tscError("failed to send server status req since %s", terrstr());
48✔
2928
    goto _OVER;
48✔
2929
  }
2930

2931
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
153✔
UNCOV
2932
    tscError("failed to parse server status rsp since %s", terrstr());
×
UNCOV
2933
    goto _OVER;
×
2934
  }
2935

2936
  code = statusRsp.statusCode;
153✔
2937
  if (details != NULL) {
153✔
2938
    tstrncpy(details, statusRsp.details, maxlen);
153✔
2939
  }
2940

2941
_OVER:
187✔
2942
  if (clientRpc != NULL) {
201✔
2943
    rpcClose(clientRpc);
201✔
2944
  }
2945
  if (rpcRsp.pCont != NULL) {
201✔
2946
    rpcFreeCont(rpcRsp.pCont);
153✔
2947
  }
2948
  return code;
201✔
2949
}
2950

2951
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,252✔
2952
                      int32_t acctId, char* db) {
2953
  SName name = {0};
1,252✔
2954

2955
  if (len1 <= 0) {
1,252✔
UNCOV
2956
    return -1;
×
2957
  }
2958

2959
  const char* dbName = db;
1,252✔
2960
  const char* tbName = NULL;
1,252✔
2961
  int32_t     dbLen = 0;
1,252✔
2962
  int32_t     tbLen = 0;
1,252✔
2963
  if (len2 > 0) {
1,252✔
UNCOV
2964
    dbName = str + pos1;
×
UNCOV
2965
    dbLen = len1;
×
UNCOV
2966
    tbName = str + pos2;
×
UNCOV
2967
    tbLen = len2;
×
2968
  } else {
2969
    dbLen = strlen(db);
1,252✔
2970
    tbName = str + pos1;
1,252✔
2971
    tbLen = len1;
1,252✔
2972
  }
2973

2974
  if (dbLen <= 0 || tbLen <= 0) {
1,252✔
UNCOV
2975
    return -1;
×
2976
  }
2977

2978
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,252✔
UNCOV
2979
    return -1;
×
2980
  }
2981

2982
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,252✔
UNCOV
2983
    return -1;
×
2984
  }
2985

2986
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,252✔
2987
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,252✔
2988

2989
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,252✔
2990
  if (pDb) {
1,252✔
UNCOV
2991
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
UNCOV
2992
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2993
    }
2994
  } else {
2995
    STablesReq db;
1,252✔
2996
    db.pTables = taosArrayInit(20, sizeof(SName));
1,252✔
2997
    if (NULL == db.pTables) {
1,252✔
2998
      return terrno;
×
2999
    }
3000
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,252✔
3001
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,504✔
UNCOV
3002
      return terrno;
×
3003
    }
3004
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,252✔
3005
  }
3006

3007
  return TSDB_CODE_SUCCESS;
1,252✔
3008
}
3009

3010
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,252✔
3011
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,252✔
3012
  if (NULL == pHash) {
1,252✔
UNCOV
3013
    return terrno;
×
3014
  }
3015

3016
  bool    inEscape = false;
1,252✔
3017
  int32_t code = 0;
1,252✔
3018
  void*   pIter = NULL;
1,252✔
3019

3020
  int32_t vIdx = 0;
1,252✔
3021
  int32_t vPos[2];
1,252✔
3022
  int32_t vLen[2];
1,252✔
3023

3024
  (void)memset(vPos, -1, sizeof(vPos));
1,252✔
3025
  (void)memset(vLen, 0, sizeof(vLen));
1,252✔
3026

3027
  for (int32_t i = 0;; ++i) {
6,260✔
3028
    if (0 == *(tbList + i)) {
6,260✔
3029
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,252✔
3030
        vLen[vIdx] = i - vPos[vIdx];
1,252✔
3031
      }
3032

3033
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,252✔
3034
      if (code) {
1,252✔
UNCOV
3035
        goto _return;
×
3036
      }
3037

3038
      break;
1,252✔
3039
    }
3040

3041
    if ('`' == *(tbList + i)) {
5,008✔
3042
      inEscape = !inEscape;
×
UNCOV
3043
      if (!inEscape) {
×
UNCOV
3044
        if (vPos[vIdx] >= 0) {
×
UNCOV
3045
          vLen[vIdx] = i - vPos[vIdx];
×
3046
        } else {
UNCOV
3047
          goto _return;
×
3048
        }
3049
      }
3050

3051
      continue;
×
3052
    }
3053

3054
    if (inEscape) {
5,008✔
UNCOV
3055
      if (vPos[vIdx] < 0) {
×
UNCOV
3056
        vPos[vIdx] = i;
×
3057
      }
3058
      continue;
×
3059
    }
3060

3061
    if ('.' == *(tbList + i)) {
5,008✔
3062
      if (vPos[vIdx] < 0) {
×
3063
        goto _return;
×
3064
      }
3065
      if (vLen[vIdx] <= 0) {
×
UNCOV
3066
        vLen[vIdx] = i - vPos[vIdx];
×
3067
      }
UNCOV
3068
      vIdx++;
×
3069
      if (vIdx >= 2) {
×
3070
        goto _return;
×
3071
      }
3072
      continue;
×
3073
    }
3074

3075
    if (',' == *(tbList + i)) {
5,008✔
3076
      if (vPos[vIdx] < 0) {
×
3077
        goto _return;
×
3078
      }
3079
      if (vLen[vIdx] <= 0) {
×
UNCOV
3080
        vLen[vIdx] = i - vPos[vIdx];
×
3081
      }
3082

3083
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3084
      if (code) {
×
UNCOV
3085
        goto _return;
×
3086
      }
3087

UNCOV
3088
      (void)memset(vPos, -1, sizeof(vPos));
×
UNCOV
3089
      (void)memset(vLen, 0, sizeof(vLen));
×
3090
      vIdx = 0;
×
3091
      continue;
×
3092
    }
3093

3094
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
5,008✔
3095
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3096
        vLen[vIdx] = i - vPos[vIdx];
×
3097
      }
3098
      continue;
×
3099
    }
3100

3101
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
5,008✔
3102
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
626✔
3103
      if (vLen[vIdx] > 0) {
5,008✔
UNCOV
3104
        goto _return;
×
3105
      }
3106
      if (vPos[vIdx] < 0) {
5,008✔
3107
        vPos[vIdx] = i;
1,252✔
3108
      }
3109
      continue;
5,008✔
3110
    }
3111

UNCOV
3112
    goto _return;
×
3113
  }
3114

3115
  int32_t dbNum = taosHashGetSize(pHash);
1,252✔
3116
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,252✔
3117
  if (NULL == pReq) {
1,252✔
UNCOV
3118
    TSC_ERR_JRET(terrno);
×
3119
  }
3120
  pIter = taosHashIterate(pHash, NULL);
1,252✔
3121
  while (pIter) {
2,504✔
3122
    STablesReq* pDb = (STablesReq*)pIter;
1,252✔
3123
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,504✔
UNCOV
3124
      TSC_ERR_JRET(terrno);
×
3125
    }
3126
    pIter = taosHashIterate(pHash, pIter);
1,252✔
3127
  }
3128

3129
  taosHashCleanup(pHash);
1,252✔
3130

3131
  return TSDB_CODE_SUCCESS;
1,252✔
3132

UNCOV
3133
_return:
×
3134

UNCOV
3135
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3136

UNCOV
3137
  pIter = taosHashIterate(pHash, NULL);
×
UNCOV
3138
  while (pIter) {
×
UNCOV
3139
    STablesReq* pDb = (STablesReq*)pIter;
×
3140
    taosArrayDestroy(pDb->pTables);
×
UNCOV
3141
    pIter = taosHashIterate(pHash, pIter);
×
3142
  }
3143

3144
  taosHashCleanup(pHash);
×
3145

3146
  return terrno;
×
3147
}
3148

3149
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,252✔
3150
  SSyncQueryParam* pParam = param;
1,252✔
3151
  pParam->pRequest->code = code;
1,252✔
3152

3153
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,252✔
UNCOV
3154
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3155
  }
3156
}
1,252✔
3157

3158
void syncQueryFn(void* param, void* res, int32_t code) {
1,083,502,684✔
3159
  SSyncQueryParam* pParam = param;
1,083,502,684✔
3160
  pParam->pRequest = res;
1,083,502,684✔
3161

3162
  if (pParam->pRequest) {
1,083,514,854✔
3163
    pParam->pRequest->code = code;
1,083,498,365✔
3164
    clientOperateReport(pParam->pRequest);
1,083,512,212✔
3165
  }
3166

3167
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,083,456,413✔
UNCOV
3168
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3169
  }
3170
}
1,083,553,127✔
3171

3172
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
1,083,121,998✔
3173
                        int8_t source) {
3174
  if (sql == NULL || NULL == fp) {
1,083,121,998✔
3175
    terrno = TSDB_CODE_INVALID_PARA;
3,892✔
UNCOV
3176
    if (fp) {
×
UNCOV
3177
      fp(param, NULL, terrno);
×
3178
    }
3179

UNCOV
3180
    return;
×
3181
  }
3182

3183
  size_t sqlLen = strlen(sql);
1,083,122,000✔
3184
  if (sqlLen > (size_t)tsMaxSQLLength) {
1,083,122,000✔
3185
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
690✔
3186
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
690✔
3187
    fp(param, NULL, terrno);
690✔
3188
    return;
690✔
3189
  }
3190

3191
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
1,083,121,310✔
3192

3193
  SRequestObj* pRequest = NULL;
1,083,121,310✔
3194
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
1,083,128,773✔
3195
  if (code != TSDB_CODE_SUCCESS) {
1,083,112,533✔
3196
    terrno = code;
2,626✔
3197
    fp(param, NULL, terrno);
2,626✔
3198
    return;
2,626✔
3199
  }
3200

3201
  code = connCheckAndUpateMetric(connId);
1,083,109,907✔
3202
  if (code != TSDB_CODE_SUCCESS) {
1,083,104,812✔
3203
    terrno = code;
312✔
3204
    fp(param, NULL, terrno);
312✔
3205
    return;
312✔
3206
  }
3207

3208
  pRequest->source = source;
1,083,104,500✔
3209
  pRequest->body.queryFp = fp;
1,083,104,292✔
3210
  doAsyncQuery(pRequest, false);
1,083,123,150✔
3211
}
3212

3213
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
252✔
3214
                                 int64_t reqid) {
3215
  if (sql == NULL || NULL == fp) {
252✔
UNCOV
3216
    terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
3217
    if (fp) {
×
UNCOV
3218
      fp(param, NULL, terrno);
×
3219
    }
3220

3221
    return;
8✔
3222
  }
3223

3224
  size_t sqlLen = strlen(sql);
252✔
3225
  if (sqlLen > (size_t)tsMaxSQLLength) {
252✔
UNCOV
3226
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
UNCOV
3227
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
UNCOV
3228
    fp(param, NULL, terrno);
×
UNCOV
3229
    return;
×
3230
  }
3231

3232
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
252✔
3233

3234
  SRequestObj* pRequest = NULL;
252✔
3235
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
252✔
3236
  if (code != TSDB_CODE_SUCCESS) {
252✔
3237
    terrno = code;
8✔
3238
    fp(param, NULL, terrno);
8✔
3239
    return;
8✔
3240
  }
3241

3242
  code = connCheckAndUpateMetric(connId);
244✔
3243

3244
  if (code != TSDB_CODE_SUCCESS) {
244✔
UNCOV
3245
    terrno = code;
×
UNCOV
3246
    fp(param, NULL, terrno);
×
UNCOV
3247
    return;
×
3248
  }
3249

3250
  pRequest->body.queryFp = fp;
244✔
3251

3252
  doAsyncQuery(pRequest, false);
244✔
3253
}
3254

3255
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
1,082,981,454✔
3256
  if (NULL == taos) {
1,082,981,454✔
UNCOV
3257
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3258
    return NULL;
×
3259
  }
3260

3261
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
1,082,981,454✔
3262
  if (NULL == param) {
1,082,997,505✔
UNCOV
3263
    return NULL;
×
3264
  }
3265

3266
  int32_t code = tsem_init(&param->sem, 0, 0);
1,082,997,505✔
3267
  if (TSDB_CODE_SUCCESS != code) {
1,082,987,629✔
UNCOV
3268
    taosMemoryFree(param);
×
UNCOV
3269
    return NULL;
×
3270
  }
3271

3272
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
1,082,987,629✔
3273
  code = tsem_wait(&param->sem);
1,082,947,923✔
3274
  if (TSDB_CODE_SUCCESS != code) {
1,082,979,001✔
3275
    taosMemoryFree(param);
×
3276
    return NULL;
×
3277
  }
3278
  code = tsem_destroy(&param->sem);
1,082,979,001✔
3279
  if (TSDB_CODE_SUCCESS != code) {
1,082,996,664✔
UNCOV
3280
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3281
  }
3282

3283
  SRequestObj* pRequest = NULL;
1,083,006,472✔
3284
  if (param->pRequest != NULL) {
1,083,006,472✔
3285
    param->pRequest->syncQuery = true;
1,083,000,546✔
3286
    pRequest = param->pRequest;
1,083,003,882✔
3287
    param->pRequest->inCallback = false;
1,083,004,658✔
3288
  }
3289
  taosMemoryFree(param);
1,083,006,905✔
3290

3291
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3292
  //          *(int64_t*)taos, pRequest);
3293

3294
  return pRequest;
1,083,000,144✔
3295
}
3296

3297
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
252✔
3298
  if (NULL == taos) {
252✔
UNCOV
3299
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3300
    return NULL;
×
3301
  }
3302

3303
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
252✔
3304
  if (param == NULL) {
252✔
UNCOV
3305
    return NULL;
×
3306
  }
3307
  int32_t code = tsem_init(&param->sem, 0, 0);
252✔
3308
  if (TSDB_CODE_SUCCESS != code) {
252✔
UNCOV
3309
    taosMemoryFree(param);
×
UNCOV
3310
    return NULL;
×
3311
  }
3312

3313
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
252✔
3314
  code = tsem_wait(&param->sem);
252✔
3315
  if (TSDB_CODE_SUCCESS != code) {
252✔
3316
    taosMemoryFree(param);
×
3317
    return NULL;
×
3318
  }
3319
  SRequestObj* pRequest = NULL;
252✔
3320
  if (param->pRequest != NULL) {
252✔
3321
    param->pRequest->syncQuery = true;
244✔
3322
    pRequest = param->pRequest;
244✔
3323
  }
3324
  taosMemoryFree(param);
252✔
3325

3326
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3327
  //   *(int64_t*)taos, pRequest);
3328

3329
  return pRequest;
252✔
3330
}
3331

3332
static void fetchCallback(void* pResult, void* param, int32_t code) {
285,166,886✔
3333
  SRequestObj* pRequest = (SRequestObj*)param;
285,166,886✔
3334

3335
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
285,166,886✔
3336

3337
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
285,166,886✔
3338
           tstrerror(code), pRequest->requestId);
3339

3340
  pResultInfo->pData = pResult;
285,166,886✔
3341
  pResultInfo->numOfRows = 0;
285,166,886✔
3342

3343
  if (code != TSDB_CODE_SUCCESS) {
285,166,815✔
UNCOV
3344
    pRequest->code = code;
×
UNCOV
3345
    taosMemoryFreeClear(pResultInfo->pData);
×
UNCOV
3346
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
UNCOV
3347
    return;
×
3348
  }
3349

3350
  if (pRequest->code != TSDB_CODE_SUCCESS) {
285,166,815✔
3351
    taosMemoryFreeClear(pResultInfo->pData);
×
3352
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3353
    return;
×
3354
  }
3355

3356
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
290,175,917✔
3357
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
285,166,847✔
3358
  if (pRequest->code != TSDB_CODE_SUCCESS) {
285,165,125✔
3359
    pResultInfo->numOfRows = 0;
65✔
3360
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
65✔
3361
             tstrerror(pRequest->code), pRequest->requestId);
3362
  } else {
3363
    tscDebug(
285,165,569✔
3364
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3365
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3366

3367
    STscObj*            pTscObj = pRequest->pTscObj;
285,166,112✔
3368
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
285,166,767✔
3369
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
285,166,781✔
3370
  }
3371

3372
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
285,166,856✔
3373
}
3374

3375
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
320,411,720✔
3376
  pRequest->body.fetchFp = fp;
320,411,720✔
3377
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
320,411,723✔
3378

3379
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
320,411,717✔
3380

3381
  // this query has no results or error exists, return directly
3382
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
320,411,720✔
UNCOV
3383
    pResultInfo->numOfRows = 0;
×
UNCOV
3384
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
×
UNCOV
3385
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3386

3387
    return;
2,196,717✔
3388
  }
3389

3390
  // all data has returned to App already, no need to try again
3391
  if (pResultInfo->completed) {
320,411,723✔
3392
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
70,489,674✔
3393
    // it is a local executed query, no need to do async fetch
3394
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
35,244,837✔
3395
      if (pResultInfo->localResultFetched) {
1,580,076✔
3396
        pResultInfo->numOfRows = 0;
790,038✔
3397
        pResultInfo->current = 0;
790,038✔
3398
      } else {
3399
        pResultInfo->localResultFetched = true;
790,038✔
3400
      }
3401
    } else {
3402
      pResultInfo->numOfRows = 0;
33,664,761✔
3403
    }
3404

3405
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
35,244,837✔
3406
    return;
35,244,837✔
3407
  }
3408

3409
  SSchedulerReq req = {
285,166,883✔
3410
      .syncReq = false,
3411
      .fetchFp = fetchCallback,
3412
      .cbParam = pRequest,
3413
  };
3414

3415
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
285,166,883✔
3416
  if (TSDB_CODE_SUCCESS != code) {
285,166,853✔
UNCOV
3417
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3418
    // pRequest->body.fetchFp(param, pRequest, code);
3419
  }
3420
}
3421

3422
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
1,083,498,848✔
3423
  pRequest->inCallback = true;
1,083,498,848✔
3424

3425
  int64_t this = pRequest->self;
1,083,524,694✔
3426
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
1,083,484,720✔
3427
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
21,170✔
UNCOV
3428
    code = TSDB_CODE_SUCCESS;
×
UNCOV
3429
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
UNCOV
3430
    if (pRequest->code == TSDB_CODE_PAR_TABLE_NOT_EXIST || pRequest->code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
×
UNCOV
3431
      pRequest->code = TSDB_CODE_SUCCESS;
×
UNCOV
3432
      if (pRequest->msgBuf != NULL && pRequest->msgBufLen > 0) {
×
UNCOV
3433
        pRequest->msgBuf[0] = '\0';
×
3434
      }
3435
    }
3436
  }
3437

3438
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
1,083,500,638✔
3439
           pRequest);
3440

3441
  if (pRequest->body.queryFp != NULL) {
1,083,500,711✔
3442
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
1,083,525,604✔
3443
  }
3444

3445
  SRequestObj* pReq = acquireRequest(this);
1,083,553,874✔
3446
  if (pReq != NULL) {
1,083,594,740✔
3447
    pReq->inCallback = false;
1,080,723,331✔
3448
    (void)releaseRequest(this);
1,080,723,035✔
3449
  }
3450
}
1,083,570,184✔
3451

3452
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
535,876✔
3453
                       SParseSqlRes* pRes) {
3454
#ifndef TD_ENTERPRISE
3455
  return TSDB_CODE_SUCCESS;
3456
#else
3457
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
535,876✔
3458
#endif
3459
}
3460

3461
void updateConnAccessInfo(SConnAccessInfo* pInfo) {
1,177,527,178✔
3462
  if (pInfo == NULL) {
1,177,527,178✔
UNCOV
3463
    return;
×
3464
  }
3465
  int64_t ts = taosGetTimestampMs();
1,177,588,274✔
3466
  if (pInfo->startTime == 0) {
1,177,588,274✔
3467
    pInfo->startTime = ts;
94,465,673✔
3468
  }
3469
  pInfo->lastAccessTime = ts;
1,177,594,908✔
3470
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc