• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.13
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "decimal.h"
22
#include "scheduler.h"
23
#include "tdatablock.h"
24
#include "tdataformat.h"
25
#include "tdef.h"
26
#include "tglobal.h"
27
#include "tmisce.h"
28
#include "tmsg.h"
29
#include "tmsgtype.h"
30
#include "tpagedbuf.h"
31
#include "tref.h"
32
#include "tsched.h"
33
#include "tversion.h"
34

35
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
36
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
37

38
void setQueryRequest(int64_t rId) {
1,571,253✔
39
  SRequestObj* pReq = acquireRequest(rId);
1,571,253✔
40
  if (pReq != NULL) {
1,571,263✔
41
    pReq->isQuery = true;
1,571,246✔
42
    (void)releaseRequest(rId);
1,571,246✔
43
  }
44
}
1,571,250✔
45

46
static bool stringLengthCheck(const char* str, size_t maxsize) {
66,861✔
47
  if (str == NULL) {
66,861!
UNCOV
48
    return false;
×
49
  }
50

51
  size_t len = strlen(str);
66,861✔
52
  if (len <= 0 || len > maxsize) {
66,861!
UNCOV
53
    return false;
×
54
  }
55

56
  return true;
66,907✔
57
}
58

59
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
32,686✔
60

61
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
32,666✔
62

63
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
1,545✔
64

65
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
32,544✔
66
  char key[512] = {0};
32,544✔
67
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
32,544✔
68
  return taosStrdup(key);
32,544!
69
}
70

71
bool chkRequestKilled(void* param) {
36,161,224✔
72
  bool         killed = false;
36,161,224✔
73
  SRequestObj* pRequest = acquireRequest((int64_t)param);
36,161,224✔
74
  if (NULL == pRequest || pRequest->killed) {
36,692,780!
UNCOV
75
    killed = true;
×
76
  }
77

78
  (void)releaseRequest((int64_t)param);
36,692,780✔
79

80
  return killed;
36,629,017✔
81
}
82

83
void cleanupAppInfo() {
18,111✔
84
  taosHashCleanup(appInfo.pInstMap);
18,111✔
85
  taosHashCleanup(appInfo.pInstMapByClusterId);
18,111✔
86
  tscInfo("cluster instance map cleaned");
18,111!
87
}
18,111✔
88

89
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
90
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
91

92
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
32,633✔
93
                              uint16_t port, int connType, STscObj** pObj) {
94
  TSC_ERR_RET(taos_init());
32,633!
95
  if (!validateUserName(user)) {
32,701!
UNCOV
96
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
97
  }
98
  int32_t code = 0;
32,697✔
99

100
  char localDb[TSDB_DB_NAME_LEN] = {0};
32,697✔
101
  if (db != NULL && strlen(db) > 0) {
32,697✔
102
    if (!validateDbName(db)) {
1,545!
UNCOV
103
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
104
    }
105

106
    tstrncpy(localDb, db, sizeof(localDb));
1,545✔
107
    (void)strdequote(localDb);
1,545✔
108
  }
109

110
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
32,681✔
111
  if (auth == NULL) {
32,681✔
112
    if (!validatePassword(pass)) {
32,678!
UNCOV
113
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
114
    }
115

116
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
32,667✔
117
  } else {
118
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
3✔
119
  }
120

121
  SCorEpSet epSet = {0};
32,615✔
122
  if (ip) {
32,615✔
123
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
11,671✔
124
  } else {
125
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
20,944!
126
  }
127

128
  if (port) {
32,545✔
129
    epSet.epSet.eps[0].port = port;
622✔
130
    epSet.epSet.eps[1].port = port;
622✔
131
  }
132

133
  char* key = getClusterKey(user, secretEncrypt, ip, port);
32,545✔
134
  if (NULL == key) {
32,609!
UNCOV
135
    TSC_ERR_RET(terrno);
×
136
  }
137
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
32,609✔
138
          user, db, key);
139
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
86,375✔
140
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
53,655!
141
  }
142
  // for (int32_t i = 0; i < epSet.epSet.numOfEps; i++) {
143
  //   if ((code = taosValidFqdn(tsEnableIpv6, epSet.epSet.eps[i].fqdn)) != 0) {
144
  //     taosMemFree(key);
145
  //     tscError("ipv6 flag %d, the local FQDN %s does not resolve to the ip address since %s", tsEnableIpv6,
146
  //              epSet.epSet.eps[i].fqdn, tstrerror(code));
147
  //     TSC_ERR_RET(code);
148
  //   }
149
  // }
150

151
  SAppInstInfo** pInst = NULL;
32,720✔
152
  code = taosThreadMutexLock(&appInfo.mutex);
32,720✔
153
  if (TSDB_CODE_SUCCESS != code) {
32,729!
UNCOV
154
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
UNCOV
155
    TSC_ERR_RET(code);
×
156
  }
157

158
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
32,729✔
159
  SAppInstInfo* p = NULL;
32,729✔
160
  if (pInst == NULL) {
32,729✔
161
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
18,402!
162
    if (NULL == p) {
18,402!
UNCOV
163
      TSC_ERR_JRET(terrno);
×
164
    }
165
    p->mgmtEp = epSet;
18,402✔
166
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
18,402✔
167
    if (TSDB_CODE_SUCCESS != code) {
18,402!
UNCOV
168
      taosMemoryFree(p);
×
UNCOV
169
      TSC_ERR_JRET(code);
×
170
    }
171
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
18,402✔
172
    if (TSDB_CODE_SUCCESS != code) {
18,402!
UNCOV
173
      taosMemoryFree(p);
×
UNCOV
174
      TSC_ERR_JRET(code);
×
175
    }
176
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
18,402✔
177
    if (TSDB_CODE_SUCCESS != code) {
18,402!
UNCOV
178
      destroyAppInst(&p);
×
UNCOV
179
      TSC_ERR_JRET(code);
×
180
    }
181
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
18,402✔
182
    if (TSDB_CODE_SUCCESS != code) {
18,402!
UNCOV
183
      destroyAppInst(&p);
×
UNCOV
184
      TSC_ERR_JRET(code);
×
185
    }
186
    p->instKey = key;
18,402✔
187
    key = NULL;
18,402✔
188
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
18,402!
189

190
    pInst = &p;
18,402✔
191
  } else {
192
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
14,327!
UNCOV
193
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
UNCOV
194
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
195
    }
196
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
197
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
14,327✔
198
  }
199

200
_return:
32,729✔
201

202
  if (TSDB_CODE_SUCCESS != code) {
32,729!
UNCOV
203
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
UNCOV
204
    taosMemoryFreeClear(key);
×
205
    return code;
×
206
  } else {
207
    code = taosThreadMutexUnlock(&appInfo.mutex);
32,729✔
208
    taosMemoryFreeClear(key);
32,727!
209
    if (TSDB_CODE_SUCCESS != code) {
32,726!
UNCOV
210
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
UNCOV
211
      return code;
×
212
    }
213
    return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
32,726✔
214
  }
215
}
216

217
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
218
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
219
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
220
//     return *ppAppInstInfo;
221
//   } else {
222
//     return NULL;
223
//   }
224
// }
225

226
void freeQueryParam(SSyncQueryParam* param) {
943✔
227
  if (param == NULL) return;
943!
228
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
943!
UNCOV
229
    tscError("failed to destroy semaphore in freeQueryParam");
×
230
  }
231
  taosMemoryFree(param);
943!
232
}
233

234
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
11,120,295✔
235
                     SRequestObj** pRequest, int64_t reqid) {
236
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
11,120,295✔
237
  if (TSDB_CODE_SUCCESS != code) {
11,135,807!
UNCOV
238
    tscError("failed to malloc sqlObj, %s", sql);
×
UNCOV
239
    return code;
×
240
  }
241

242
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
11,135,807!
243
  if ((*pRequest)->sqlstr == NULL) {
11,127,303!
UNCOV
244
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
UNCOV
245
    destroyRequest(*pRequest);
×
246
    *pRequest = NULL;
×
247
    return terrno;
×
248
  }
249

250
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
11,127,303✔
251
  (*pRequest)->sqlstr[sqlLen] = 0;
11,130,024✔
252
  (*pRequest)->sqlLen = sqlLen;
11,130,024✔
253
  (*pRequest)->validateOnly = validateSql;
11,130,024✔
254
  (*pRequest)->isStmtBind = false;
11,130,024✔
255

256
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
11,130,024✔
257

258
  STscObj* pTscObj = (*pRequest)->pTscObj;
11,130,024✔
259
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
11,130,024✔
260
                             sizeof((*pRequest)->self));
261
  if (err) {
11,122,081!
UNCOV
262
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
263
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
264
    destroyRequest(*pRequest);
×
UNCOV
265
    *pRequest = NULL;
×
266
    return terrno;
×
267
  }
268

269
  (*pRequest)->allocatorRefId = -1;
11,122,081✔
270
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
11,122,081✔
271
    if (TSDB_CODE_SUCCESS !=
1,382,032!
272
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
1,381,985✔
UNCOV
273
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
274
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
275
      destroyRequest(*pRequest);
×
UNCOV
276
      *pRequest = NULL;
×
277
      return terrno;
×
278
    }
279
  }
280

281
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
11,130,351✔
282
  return TSDB_CODE_SUCCESS;
11,123,034✔
283
}
284

285
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
461✔
286
  int32_t code =
287
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
461✔
288
  if (TSDB_CODE_SUCCESS == code) {
461!
289
    pRequest->relation.prevRefId = (*pNewRequest)->self;
461✔
290
    (*pNewRequest)->relation.nextRefId = pRequest->self;
461✔
291
    (*pNewRequest)->relation.userRefId = pRequest->self;
461✔
292
    (*pNewRequest)->isSubReq = true;
461✔
293
  }
294
  return code;
461✔
295
}
296

297
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
20,823✔
298
  STscObj* pTscObj = pRequest->pTscObj;
20,823✔
299

300
  SParseContext cxt = {
20,823✔
301
      .requestId = pRequest->requestId,
20,823✔
302
      .requestRid = pRequest->self,
20,823✔
303
      .acctId = pTscObj->acctId,
20,823✔
304
      .db = pRequest->pDb,
20,823✔
305
      .topicQuery = topicQuery,
306
      .pSql = pRequest->sqlstr,
20,823✔
307
      .sqlLen = pRequest->sqlLen,
20,823✔
308
      .pMsg = pRequest->msgBuf,
20,823✔
309
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
310
      .pTransporter = pTscObj->pAppInfo->pTransporter,
20,823✔
311
      .pStmtCb = pStmtCb,
312
      .pUser = pTscObj->user,
20,823✔
313
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
20,823✔
314
      .enableSysInfo = pTscObj->sysInfo,
20,823✔
315
      .svrVer = pTscObj->sVer,
20,823✔
316
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
20,823✔
317
      .isStmtBind = pRequest->isStmtBind,
20,823✔
318
      .setQueryFp = setQueryRequest,
319
      .timezone = pTscObj->optionInfo.timezone,
20,823✔
320
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
20,823✔
321
  };
322

323
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
20,823✔
324
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
20,864✔
325
  if (code != TSDB_CODE_SUCCESS) {
20,856!
UNCOV
326
    return code;
×
327
  }
328

329
  code = qParseSql(&cxt, pQuery);
20,856✔
330
  if (TSDB_CODE_SUCCESS == code) {
20,838✔
331
    if ((*pQuery)->haveResultSet) {
20,813!
UNCOV
332
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
×
UNCOV
333
                              (*pQuery)->pResExtSchema, pRequest->isStmtBind);
×
334
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
335
    }
336
  }
337

338
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
20,858!
339
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
20,819✔
340
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
20,819✔
341
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
20,819✔
342
  }
343

344
  taosArrayDestroy(cxt.pTableMetaPos);
20,858✔
345
  taosArrayDestroy(cxt.pTableVgroupPos);
20,848✔
346

347
  return code;
20,857✔
348
}
349

UNCOV
350
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
UNCOV
351
  SRetrieveTableRsp* pRsp = NULL;
×
352
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
353
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode,
×
354
                              pRequest->pTscObj->optionInfo.charsetCxt);
×
355
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
356
    code =
357
        setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
×
358
  }
359

UNCOV
360
  return code;
×
361
}
362

363
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
1,127✔
364
  // drop table if exists not_exists_table
365
  if (NULL == pQuery->pCmdMsg) {
1,127!
UNCOV
366
    return TSDB_CODE_SUCCESS;
×
367
  }
368

369
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
1,127✔
370
  pRequest->type = pMsgInfo->msgType;
1,127✔
371
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
1,127✔
372
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
1,127✔
373

374
  STscObj*      pTscObj = pRequest->pTscObj;
1,127✔
375
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
1,127✔
376

377
  // int64_t transporterId = 0;
378
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
1,129!
379
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
1,135!
380
  return TSDB_CODE_SUCCESS;
1,135✔
381
}
382

383
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
21,368,858✔
384

385
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
130,459✔
386
  SRetrieveTableRsp* pRsp = NULL;
130,459✔
387
  if (pRequest->validateOnly) {
130,459✔
388
    doRequestCallback(pRequest, 0);
27✔
389
    return;
27✔
390
  }
391

392
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
130,433✔
393
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
130,432✔
394
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
130,432✔
395
    code =
396
        setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4, pRequest->isStmtBind);
119,991✔
397
  }
398

399
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
130,431✔
400
  pRequest->code = code;
130,431✔
401

402
  if (pRequest->code != TSDB_CODE_SUCCESS) {
130,431✔
403
    pResultInfo->numOfRows = 0;
8✔
404
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
8!
405
             pRequest->requestId);
406
  } else {
407
    tscDebug(
130,423✔
408
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
409
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
410
  }
411

412
  doRequestCallback(pRequest, code);
130,434✔
413
}
414

415
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
40,048✔
416
  if (pRequest->validateOnly) {
40,048!
UNCOV
417
    doRequestCallback(pRequest, 0);
×
UNCOV
418
    return TSDB_CODE_SUCCESS;
×
419
  }
420

421
  // drop table if exists not_exists_table
422
  if (NULL == pQuery->pCmdMsg) {
40,048✔
423
    doRequestCallback(pRequest, 0);
1✔
424
    return TSDB_CODE_SUCCESS;
1✔
425
  }
426

427
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
40,047✔
428
  pRequest->type = pMsgInfo->msgType;
40,047✔
429
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
40,047✔
430
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
40,047✔
431

432
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
40,047✔
433
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
40,036✔
434

435
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
40,053✔
436
  if (code) {
40,098!
UNCOV
437
    doRequestCallback(pRequest, code);
×
438
  }
439
  return code;
40,099✔
440
}
441

442
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
247,948✔
443
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
247,948✔
444
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
247,948✔
445

446
  if (node1->load < node2->load) {
247,948!
UNCOV
447
    return -1;
×
448
  }
449

450
  return node1->load > node2->load;
247,948✔
451
}
452

453
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
87,006✔
454
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
87,006!
455
  if (pInfo->pQnodeList) {
87,006✔
456
    taosArrayDestroy(pInfo->pQnodeList);
86,339✔
457
    pInfo->pQnodeList = NULL;
86,339✔
458
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
86,339✔
459
  }
460

461
  if (pNodeList) {
87,006!
462
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
87,006✔
463
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
87,006✔
464
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
87,006✔
465
             taosArrayGetSize(pInfo->pQnodeList));
466
  }
467
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
87,006!
468

469
  return TSDB_CODE_SUCCESS;
87,006✔
470
}
471

472
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
11,097,654✔
473
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
11,097,654✔
474
    *required = false;
10,441,176✔
475
    return TSDB_CODE_SUCCESS;
10,441,176✔
476
  }
477

478
  int32_t       code = TSDB_CODE_SUCCESS;
656,478✔
479
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
656,478✔
480
  *required = false;
656,478✔
481

482
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
656,478!
483
  *required = (NULL == pInfo->pQnodeList);
656,478✔
484
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
656,478!
485
  return TSDB_CODE_SUCCESS;
656,478✔
486
}
487

UNCOV
488
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
UNCOV
489
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
490
  int32_t       code = 0;
×
491

492
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
UNCOV
493
  if (pInfo->pQnodeList) {
×
494
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
495
  }
496
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
UNCOV
497
  if (NULL == *pNodeList) {
×
498
    SCatalog* pCatalog = NULL;
×
499
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
500
    if (TSDB_CODE_SUCCESS == code) {
×
501
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
502
      if (NULL == pNodeList) {
×
503
        TSC_ERR_RET(terrno);
×
504
      }
505
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
UNCOV
506
                               .requestId = pRequest->requestId,
×
507
                               .requestObjRefId = pRequest->self,
×
508
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
509
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
510
    }
511

UNCOV
512
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
UNCOV
513
      code = updateQnodeList(pInfo, *pNodeList);
×
514
    }
515
  }
516

UNCOV
517
  return code;
×
518
}
519

520
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
57,056✔
521
  pRequest->type = pQuery->msgType;
57,056✔
522
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
57,056✔
523

524
  SPlanContext cxt = {.queryId = pRequest->requestId,
114,193✔
525
                      .acctId = pRequest->pTscObj->acctId,
57,060✔
526
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
57,060✔
527
                      .pAstRoot = pQuery->pRoot,
57,133✔
528
                      .showRewrite = pQuery->showRewrite,
57,133✔
529
                      .pMsg = pRequest->msgBuf,
57,133✔
530
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
531
                      .pUser = pRequest->pTscObj->user,
57,133✔
532
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
57,133✔
533
                      .sysInfo = pRequest->pTscObj->sysInfo};
57,133✔
534

535
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
57,133✔
536
}
537

538
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
1,339,711✔
539
                         const SExtSchema* pExtSchema, bool isStmt) {
540
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
1,339,711!
UNCOV
541
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
UNCOV
542
    return TSDB_CODE_INVALID_PARA;
×
543
  }
544

545
  pResInfo->numOfCols = numOfCols;
1,339,751✔
546
  if (pResInfo->fields != NULL) {
1,339,751✔
547
    taosMemoryFree(pResInfo->fields);
37!
548
  }
549
  if (pResInfo->userFields != NULL) {
1,339,751✔
550
    taosMemoryFree(pResInfo->userFields);
37!
551
  }
552
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
1,339,751!
553
  if (NULL == pResInfo->fields) return terrno;
1,339,736!
554
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,339,736!
555
  if (NULL == pResInfo->userFields) {
1,339,722!
UNCOV
556
    taosMemoryFree(pResInfo->fields);
×
UNCOV
557
    return terrno;
×
558
  }
559
  if (numOfCols != pResInfo->numOfCols) {
1,339,722!
UNCOV
560
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
UNCOV
561
    return TSDB_CODE_FAILED;
×
562
  }
563

564
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
6,581,753✔
565
    pResInfo->fields[i].type = pSchema[i].type;
5,242,031✔
566

567
    pResInfo->userFields[i].type = pSchema[i].type;
5,242,031✔
568
    // userFields must convert to type bytes, no matter isStmt or not
569
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
5,242,031✔
570
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
5,241,993✔
571
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
5,241,917!
572
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
8,560✔
573
    }
574

575
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
5,242,031✔
576
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
5,242,031✔
577
  }
578
  return TSDB_CODE_SUCCESS;
1,339,722✔
579
}
580

581
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
1,010,973✔
582
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
1,010,973!
583
      precision != TSDB_TIME_PRECISION_NANO) {
UNCOV
584
    return;
×
585
  }
586

587
  pResInfo->precision = precision;
1,010,973✔
588
}
589

590
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
703,595✔
591
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
703,595✔
592
  if (NULL == nodeList) {
703,696!
UNCOV
593
    return terrno;
×
594
  }
595
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
703,714✔
596

597
  int32_t dbNum = taosArrayGetSize(pDbVgList);
703,714✔
598
  for (int32_t i = 0; i < dbNum; ++i) {
1,382,933✔
599
    SArray* pVg = taosArrayGetP(pDbVgList, i);
679,274✔
600
    if (NULL == pVg) {
679,306!
UNCOV
601
      continue;
×
602
    }
603
    int32_t vgNum = taosArrayGetSize(pVg);
679,306✔
604
    if (vgNum <= 0) {
679,314✔
605
      continue;
631✔
606
    }
607

608
    for (int32_t j = 0; j < vgNum; ++j) {
3,436,981✔
609
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
2,758,380✔
610
      if (NULL == pInfo) {
2,758,328!
UNCOV
611
        taosArrayDestroy(nodeList);
×
UNCOV
612
        return TSDB_CODE_OUT_OF_RANGE;
×
613
      }
614
      SQueryNodeLoad load = {0};
2,758,328✔
615
      load.addr.nodeId = pInfo->vgId;
2,758,328✔
616
      load.addr.epSet = pInfo->epSet;
2,758,328✔
617

618
      if (NULL == taosArrayPush(nodeList, &load)) {
2,758,298!
UNCOV
619
        taosArrayDestroy(nodeList);
×
UNCOV
620
        return terrno;
×
621
      }
622
    }
623
  }
624

625
  int32_t vnodeNum = taosArrayGetSize(nodeList);
703,659✔
626
  if (vnodeNum > 0) {
703,738✔
627
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
677,848✔
628
    goto _return;
677,850✔
629
  }
630

631
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
25,890✔
632
  if (mnodeNum <= 0) {
25,887!
UNCOV
633
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
UNCOV
634
    goto _return;
×
635
  }
636

637
  void* pData = taosArrayGet(pMnodeList, 0);
25,887✔
638
  if (NULL == pData) {
25,885!
UNCOV
639
    taosArrayDestroy(nodeList);
×
UNCOV
640
    return TSDB_CODE_OUT_OF_RANGE;
×
641
  }
642
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
25,885!
UNCOV
643
    taosArrayDestroy(nodeList);
×
UNCOV
644
    return terrno;
×
645
  }
646

647
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
25,888✔
648

649
_return:
21,600✔
650

651
  *pNodeList = nodeList;
703,712✔
652

653
  return TSDB_CODE_SUCCESS;
703,712✔
654
}
655

656
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
295,721✔
657
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
295,721✔
658
  if (NULL == nodeList) {
295,721!
UNCOV
659
    return terrno;
×
660
  }
661

662
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
295,721✔
663
  if (qNodeNum > 0) {
295,721✔
664
    void* pData = taosArrayGet(pQnodeList, 0);
295,399✔
665
    if (NULL == pData) {
295,399!
UNCOV
666
      taosArrayDestroy(nodeList);
×
UNCOV
667
      return TSDB_CODE_OUT_OF_RANGE;
×
668
    }
669
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
295,399!
UNCOV
670
      taosArrayDestroy(nodeList);
×
UNCOV
671
      return terrno;
×
672
    }
673
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
295,399✔
674
    goto _return;
295,399✔
675
  }
676

677
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
322✔
678
  if (mnodeNum <= 0) {
322✔
679
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
4!
680
    goto _return;
4✔
681
  }
682

683
  void* pData = taosArrayGet(pMnodeList, 0);
318✔
684
  if (NULL == pData) {
318!
UNCOV
685
    taosArrayDestroy(nodeList);
×
UNCOV
686
    return TSDB_CODE_OUT_OF_RANGE;
×
687
  }
688
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
318!
UNCOV
689
    taosArrayDestroy(nodeList);
×
UNCOV
690
    return terrno;
×
691
  }
692

693
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
318!
694

UNCOV
695
_return:
×
696

697
  *pNodeList = nodeList;
295,721✔
698

699
  return TSDB_CODE_SUCCESS;
295,721✔
700
}
701

702
void freeVgList(void* list) {
36,785✔
703
  SArray* pList = *(SArray**)list;
36,785✔
704
  taosArrayDestroy(pList);
36,785✔
705
}
36,826✔
706

707
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
942,255✔
708
  SArray* pDbVgList = NULL;
942,255✔
709
  SArray* pQnodeList = NULL;
942,255✔
710
  FDelete fp = NULL;
942,255✔
711
  int32_t code = 0;
942,255✔
712

713
  switch (tsQueryPolicy) {
942,255!
714
    case QUERY_POLICY_VNODE:
646,574✔
715
    case QUERY_POLICY_CLIENT: {
716
      if (pResultMeta) {
646,574!
717
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
646,601✔
718
        if (NULL == pDbVgList) {
646,566!
UNCOV
719
          code = terrno;
×
UNCOV
720
          goto _return;
×
721
        }
722
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
646,566✔
723
        for (int32_t i = 0; i < dbNum; ++i) {
1,289,030✔
724
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
642,450✔
725
          if (pRes->code || NULL == pRes->pRes) {
642,444!
UNCOV
726
            continue;
×
727
          }
728

729
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
1,284,966!
UNCOV
730
            code = terrno;
×
UNCOV
731
            goto _return;
×
732
          }
733
        }
734
      } else {
UNCOV
735
        fp = freeVgList;
×
736

737
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
UNCOV
738
        if (dbNum > 0) {
×
739
          SCatalog*     pCtg = NULL;
×
740
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
741
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
742
          if (code != TSDB_CODE_SUCCESS) {
×
743
            goto _return;
×
744
          }
745

UNCOV
746
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
UNCOV
747
          if (NULL == pDbVgList) {
×
748
            code = terrno;
×
749
            goto _return;
×
750
          }
751
          SArray* pVgList = NULL;
×
UNCOV
752
          for (int32_t i = 0; i < dbNum; ++i) {
×
753
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
754
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
755
                                     .requestId = pRequest->requestId,
×
756
                                     .requestObjRefId = pRequest->self,
×
757
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
758

759
            // catalogGetDBVgList will handle dbFName == null.
UNCOV
760
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
UNCOV
761
            if (code) {
×
762
              goto _return;
×
763
            }
764

UNCOV
765
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
UNCOV
766
              code = terrno;
×
767
              goto _return;
×
768
            }
769
          }
770
        }
771
      }
772

773
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
646,580✔
774
      break;
646,602✔
775
    }
776
    case QUERY_POLICY_HYBRID:
295,716✔
777
    case QUERY_POLICY_QNODE: {
778
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
301,087✔
779
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
5,371✔
780
        if (pRes->code) {
5,371!
UNCOV
781
          pQnodeList = NULL;
×
782
        } else {
783
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
5,371✔
784
          if (NULL == pQnodeList) {
5,371!
UNCOV
785
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
786
            goto _return;
×
787
          }
788
        }
789
      } else {
790
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
290,351✔
791
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
290,351!
792
        if (pInst->pQnodeList) {
290,350!
793
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
290,350✔
794
          if (NULL == pQnodeList) {
290,350!
UNCOV
795
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
UNCOV
796
            goto _return;
×
797
          }
798
        }
799
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
290,350!
800
      }
801

802
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
295,721✔
803
      break;
295,721✔
804
    }
UNCOV
805
    default:
×
UNCOV
806
      tscError("unknown query policy: %d", tsQueryPolicy);
×
807
      return TSDB_CODE_APP_ERROR;
×
808
  }
809

810
_return:
942,323✔
811
  taosArrayDestroyEx(pDbVgList, fp);
942,323✔
812
  taosArrayDestroy(pQnodeList);
942,338✔
813

814
  return code;
942,337✔
815
}
816

817
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
57,030✔
818
  SArray* pDbVgList = NULL;
57,030✔
819
  SArray* pQnodeList = NULL;
57,030✔
820
  int32_t code = 0;
57,030✔
821

822
  switch (tsQueryPolicy) {
57,030!
823
    case QUERY_POLICY_VNODE:
57,051✔
824
    case QUERY_POLICY_CLIENT: {
825
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
57,051✔
826
      if (dbNum > 0) {
57,101✔
827
        SCatalog*     pCtg = NULL;
36,817✔
828
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
36,817✔
829
        code = catalogGetHandle(pInst->clusterId, &pCtg);
36,817✔
830
        if (code != TSDB_CODE_SUCCESS) {
36,790!
UNCOV
831
          goto _return;
×
832
        }
833

834
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
36,790✔
835
        if (NULL == pDbVgList) {
36,799✔
836
          code = terrno;
20✔
UNCOV
837
          goto _return;
×
838
        }
839
        SArray* pVgList = NULL;
36,779✔
840
        for (int32_t i = 0; i < dbNum; ++i) {
73,592✔
841
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
36,758✔
842
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
36,773✔
843
                                   .requestId = pRequest->requestId,
36,773✔
844
                                   .requestObjRefId = pRequest->self,
36,773✔
845
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
36,773✔
846

847
          // catalogGetDBVgList will handle dbFName == null.
848
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
36,824✔
849
          if (code) {
36,808!
UNCOV
850
            goto _return;
×
851
          }
852

853
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
36,813!
UNCOV
854
            code = terrno;
×
UNCOV
855
            goto _return;
×
856
          }
857
        }
858
      }
859

860
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
57,118✔
861
      break;
57,107✔
862
    }
UNCOV
863
    case QUERY_POLICY_HYBRID:
×
864
    case QUERY_POLICY_QNODE: {
865
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
866

867
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
UNCOV
868
      break;
×
869
    }
870
    default:
×
UNCOV
871
      tscError("unknown query policy: %d", tsQueryPolicy);
×
872
      return TSDB_CODE_APP_ERROR;
×
873
  }
874

875
_return:
57,107✔
876

877
  taosArrayDestroyEx(pDbVgList, freeVgList);
57,107✔
878
  taosArrayDestroy(pQnodeList);
57,097✔
879

880
  return code;
57,111✔
881
}
882

883
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
57,059✔
884
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
57,059✔
885

886
  SExecResult      res = {0};
57,059✔
887
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
57,059✔
888
                           .requestId = pRequest->requestId,
57,059✔
889
                           .requestObjRefId = pRequest->self};
57,059✔
890
  SSchedulerReq    req = {
114,135✔
891
         .syncReq = true,
892
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
57,059✔
893
         .pConn = &conn,
894
         .pNodeList = pNodeList,
895
         .pDag = pDag,
896
         .sql = pRequest->sqlstr,
57,059✔
897
         .startTs = pRequest->metric.start,
57,059✔
898
         .execFp = NULL,
899
         .cbParam = NULL,
900
         .chkKillFp = chkRequestKilled,
901
         .chkKillParam = (void*)pRequest->self,
57,059✔
902
         .pExecRes = &res,
903
         .source = pRequest->source,
57,059✔
904
         .pWorkerCb = getTaskPoolWorkerCb(),
57,059✔
905
  };
906

907
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
57,076✔
908

909
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
57,107✔
910
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
57,103✔
911

912
  if (code != TSDB_CODE_SUCCESS) {
57,103!
UNCOV
913
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
914

915
    pRequest->code = code;
×
UNCOV
916
    terrno = code;
×
917
    return pRequest->code;
9✔
918
  }
919

920
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
57,103!
921
      TDMT_VND_CREATE_TABLE == pRequest->type) {
171✔
922
    pRequest->body.resInfo.numOfRows = res.numOfRows;
57,058✔
923
    if (TDMT_VND_SUBMIT == pRequest->type) {
57,058✔
924
      STscObj*            pTscObj = pRequest->pTscObj;
56,947✔
925
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
56,947✔
926
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
56,947✔
927
    }
928

929
    schedulerFreeJob(&pRequest->body.queryJob, 0);
57,074✔
930
  }
931

932
  pRequest->code = res.code;
57,123✔
933
  terrno = res.code;
57,123✔
934
  return pRequest->code;
57,110✔
935
}
936

937
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
9,734,567✔
938
  SArray*      pArray = NULL;
9,734,567✔
939
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
9,734,567✔
940
  if (NULL == pRsp->aCreateTbRsp) {
9,734,567✔
941
    return TSDB_CODE_SUCCESS;
9,662,415✔
942
  }
943

944
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
72,152✔
945
  for (int32_t i = 0; i < tbNum; ++i) {
157,121✔
946
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
81,587✔
947
    if (pTbRsp->pMeta) {
81,582✔
948
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
60,280!
949
    }
950
  }
951

952
  return TSDB_CODE_SUCCESS;
75,534✔
953
}
954

955
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
750,051✔
956
  int32_t code = 0;
750,051✔
957
  SArray* pArray = NULL;
750,051✔
958
  SArray* pTbArray = (SArray*)res;
750,051✔
959
  int32_t tbNum = taosArrayGetSize(pTbArray);
750,051✔
960
  if (tbNum <= 0) {
750,056!
UNCOV
961
    return TSDB_CODE_SUCCESS;
×
962
  }
963

964
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
750,056✔
965
  if (NULL == pArray) {
750,070✔
966
    return terrno;
4✔
967
  }
968

969
  for (int32_t i = 0; i < tbNum; ++i) {
2,130,208✔
970
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
1,380,135✔
971
    if (NULL == tbInfo) {
1,380,137!
UNCOV
972
      code = terrno;
×
UNCOV
973
      goto _return;
×
974
    }
975
    STbSVersion tbSver = {
1,380,137✔
976
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
1,380,137✔
977
    if (NULL == taosArrayPush(pArray, &tbSver)) {
1,380,142!
UNCOV
978
      code = terrno;
×
UNCOV
979
      goto _return;
×
980
    }
981
  }
982

983
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
750,073✔
984
                           .requestId = pRequest->requestId,
750,073✔
985
                           .requestObjRefId = pRequest->self,
750,073✔
986
                           .mgmtEps = *epset};
987

988
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
750,073✔
989

990
_return:
750,059✔
991

992
  taosArrayDestroy(pArray);
750,059✔
993
  return code;
750,076✔
994
}
995

996
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
6,456✔
997
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
6,456✔
998
}
999

1000
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
213,151✔
1001
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
213,151✔
1002
}
1003

1004
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
10,798,536✔
1005
  if (NULL == pRequest->body.resInfo.execRes.res) {
10,798,536✔
1006
    return pRequest->code;
222,463✔
1007
  }
1008

1009
  SCatalog*     pCatalog = NULL;
10,576,073✔
1010
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
10,576,073✔
1011

1012
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
10,576,542✔
1013
  if (code) {
10,578,559!
UNCOV
1014
    return code;
×
1015
  }
1016

1017
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
10,578,559✔
1018
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
10,610,505✔
1019

1020
  switch (pRes->msgType) {
10,610,505✔
1021
    case TDMT_VND_ALTER_TABLE:
1,621✔
1022
    case TDMT_MND_ALTER_STB: {
1023
      code = handleAlterTbExecRes(pRes->res, pCatalog);
1,621✔
1024
      break;
1,621✔
1025
    }
1026
    case TDMT_VND_CREATE_TABLE: {
110,597✔
1027
      SArray* pList = (SArray*)pRes->res;
110,597✔
1028
      int32_t num = taosArrayGetSize(pList);
110,597✔
1029
      for (int32_t i = 0; i < num; ++i) {
256,377✔
1030
        void* res = taosArrayGetP(pList, i);
145,767✔
1031
        // handleCreateTbExecRes will handle res == null
1032
        code = handleCreateTbExecRes(res, pCatalog);
145,757✔
1033
      }
1034
      break;
110,610✔
1035
    }
1036
    case TDMT_MND_CREATE_STB: {
933✔
1037
      code = handleCreateTbExecRes(pRes->res, pCatalog);
933✔
1038
      break;
933✔
1039
    }
1040
    case TDMT_VND_SUBMIT: {
9,740,329✔
1041
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
9,740,329✔
1042

1043
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
9,744,337✔
1044
      break;
9,736,845✔
1045
    }
1046
    case TDMT_SCH_QUERY:
750,068✔
1047
    case TDMT_SCH_MERGE_QUERY: {
1048
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
750,068✔
1049
      break;
750,066✔
1050
    }
1051
    default:
6,957✔
1052
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
6,957!
1053
               pRequest->type, pRequest->requestId);
UNCOV
1054
      code = TSDB_CODE_APP_ERROR;
×
1055
  }
1056

1057
  return code;
10,600,075✔
1058
}
1059

1060
static bool incompletaFileParsing(SNode* pStmt) {
10,766,236✔
1061
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
10,766,236✔
1062
}
1063

1064
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
438✔
1065
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
438✔
1066

1067
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
438✔
1068
  if (TSDB_CODE_SUCCESS == code) {
438!
1069
    int64_t analyseStart = taosGetTimestampUs();
438✔
1070
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
438✔
1071
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
438✔
1072
  }
1073

1074
  if (TSDB_CODE_SUCCESS == code) {
438!
1075
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
438✔
1076
  }
1077

1078
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
438✔
1079
  handleQueryAnslyseRes(pWrapper, NULL, code);
438✔
1080
}
438✔
1081

1082
void returnToUser(SRequestObj* pRequest) {
81,881✔
1083
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
81,881!
1084
    // return to client
1085
    doRequestCallback(pRequest, pRequest->code);
81,880✔
1086
    return;
81,880✔
1087
  }
1088

1089
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
1✔
1090
  if (pUserReq) {
1!
1091
    pUserReq->code = pRequest->code;
1✔
1092
    // return to client
1093
    doRequestCallback(pUserReq, pUserReq->code);
1✔
1094
    (void)releaseRequest(pRequest->relation.userRefId);
1✔
1095
    return;
1✔
1096
  } else {
UNCOV
1097
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1098
             pRequest->relation.userRefId, pRequest->requestId);
1099
  }
1100
}
1101

1102
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
438✔
1103
  int64_t     lastTs = 0;
438✔
1104
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
438✔
1105
  int32_t     numOfFields = taos_num_fields(pRes);
438✔
1106

1107
  int32_t code = createDataBlock(pBlock);
438✔
1108
  if (code) {
438!
UNCOV
1109
    return code;
×
1110
  }
1111

1112
  for (int32_t i = 0; i < numOfFields; ++i) {
1,752✔
1113
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
1,314✔
1114
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
1,314✔
1115
    if (TSDB_CODE_SUCCESS != code) {
1,314!
UNCOV
1116
      blockDataDestroy(*pBlock);
×
UNCOV
1117
      return code;
×
1118
    }
1119
  }
1120

1121
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
438✔
1122
  if (TSDB_CODE_SUCCESS != code) {
438!
UNCOV
1123
    blockDataDestroy(*pBlock);
×
UNCOV
1124
    return code;
×
1125
  }
1126

1127
  for (int32_t i = 0; i < numOfRows; ++i) {
1,328✔
1128
    TAOS_ROW pRow = taos_fetch_row(pRes);
890✔
1129
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
890!
UNCOV
1130
      tscError("invalid data from vnode");
×
UNCOV
1131
      blockDataDestroy(*pBlock);
×
1132
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1133
    }
1134
    int64_t ts = *(int64_t*)pRow[0];
890✔
1135
    if (lastTs < ts) {
890✔
1136
      lastTs = ts;
503✔
1137
    }
1138

1139
    for (int32_t j = 0; j < numOfFields; ++j) {
3,560✔
1140
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
2,670✔
1141
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
2,670✔
1142
      if (TSDB_CODE_SUCCESS != code) {
2,670!
UNCOV
1143
        blockDataDestroy(*pBlock);
×
UNCOV
1144
        return code;
×
1145
      }
1146
    }
1147

1148
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
890!
1149
            *(int64_t*)pRow[2]);
1150
  }
1151

1152
  (*pBlock)->info.window.ekey = lastTs;
438✔
1153
  (*pBlock)->info.rows = numOfRows;
438✔
1154

1155
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
438!
1156
  return TSDB_CODE_SUCCESS;
438✔
1157
}
1158

1159
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
439✔
1160
  SRequestObj* pRequest = (SRequestObj*)res;
439✔
1161
  if (pRequest->code) {
439✔
1162
    returnToUser(pRequest);
1✔
1163
    return;
1✔
1164
  }
1165

1166
  SSDataBlock* pBlock = NULL;
438✔
1167
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
438✔
1168
  if (TSDB_CODE_SUCCESS != pRequest->code) {
438!
UNCOV
1169
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1170
             tstrerror(pRequest->code));
1171
    returnToUser(pRequest);
×
UNCOV
1172
    return;
×
1173
  }
1174

1175
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
438✔
1176
  if (pNextReq) {
438!
1177
    continuePostSubQuery(pNextReq, pBlock);
438✔
1178
    (void)releaseRequest(pRequest->relation.nextRefId);
438✔
1179
  } else {
UNCOV
1180
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1181
             pRequest->relation.nextRefId, pRequest->requestId);
1182
  }
1183

1184
  blockDataDestroy(pBlock);
438✔
1185
}
1186

1187
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
439✔
1188
  SRequestObj* pRequest = pWrapper->pRequest;
439✔
1189
  if (TD_RES_QUERY(pRequest)) {
439!
1190
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
439✔
1191
    return;
439✔
1192
  }
1193

UNCOV
1194
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
UNCOV
1195
  if (pNextReq) {
×
1196
    continuePostSubQuery(pNextReq, NULL);
×
1197
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1198
  } else {
1199
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1200
             pRequest->relation.nextRefId, pRequest->requestId);
1201
  }
1202
}
1203

1204
// todo refacto the error code  mgmt
1205
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
10,711,145✔
1206
  SSqlCallbackWrapper* pWrapper = param;
10,711,145✔
1207
  SRequestObj*         pRequest = pWrapper->pRequest;
10,711,145✔
1208
  STscObj*             pTscObj = pRequest->pTscObj;
10,711,145✔
1209

1210
  pRequest->code = code;
10,711,145✔
1211
  if (pResult) {
10,711,145!
1212
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
10,711,290✔
1213
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
10,723,777✔
1214
  }
1215

1216
  int32_t type = pRequest->type;
10,723,632✔
1217
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
10,723,632✔
1218
    if (pResult) {
9,814,665!
1219
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
9,825,184✔
1220

1221
      // record the insert rows
1222
      if (TDMT_VND_SUBMIT == type) {
9,825,184✔
1223
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
9,656,048✔
1224
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
9,656,048✔
1225
      }
1226
    }
1227
    schedulerFreeJob(&pRequest->body.queryJob, 0);
9,847,464✔
1228
  }
1229

1230
  taosMemoryFree(pResult);
10,755,761!
1231
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
10,765,976✔
1232
           pRequest->requestId);
1233

1234
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
10,765,449!
1235
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
295✔
1236
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1237
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
295!
UNCOV
1238
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1239
    }
1240
    restartAsyncQuery(pRequest, code);
294✔
1241
    return;
295✔
1242
  }
1243

1244
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
10,765,154!
1245
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
10,765,154!
1246
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
25,353!
UNCOV
1247
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1248
    }
1249
  }
1250

1251
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
10,760,708✔
1252
  int32_t code1 = handleQueryExecRsp(pRequest);
10,760,708✔
1253
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
10,763,689!
UNCOV
1254
    pRequest->code = code1;
×
1255
  }
1256

1257
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
21,530,337!
1258
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
10,764,167✔
UNCOV
1259
    continueInsertFromCsv(pWrapper, pRequest);
×
1260
    return;
22✔
1261
  }
1262

1263
  if (pRequest->relation.nextRefId) {
10,768,977✔
1264
    handlePostSubQuery(pWrapper);
439✔
1265
  } else {
1266
    destorySqlCallbackWrapper(pWrapper);
10,768,538✔
1267
    pRequest->pWrapper = NULL;
10,771,521✔
1268

1269
    // return to client
1270
    doRequestCallback(pRequest, code);
10,771,521✔
1271
  }
1272
}
1273

1274
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
58,186✔
1275
  int32_t code = 0;
58,186✔
1276
  int32_t subplanNum = 0;
58,186✔
1277

1278
  if (pQuery->pRoot) {
58,186✔
1279
    pRequest->stmtType = pQuery->pRoot->type;
57,081✔
1280
  }
1281

1282
  if (pQuery->pRoot && !pRequest->inRetry) {
58,186!
1283
    STscObj*            pTscObj = pRequest->pTscObj;
57,110✔
1284
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
57,110✔
1285
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
57,110✔
1286
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
57,093✔
1287
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
17!
1288
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
22✔
1289
    }
1290
  }
1291

1292
  pRequest->body.execMode = pQuery->execMode;
58,255✔
1293
  switch (pQuery->execMode) {
58,255!
UNCOV
1294
    case QUERY_EXEC_MODE_LOCAL:
×
UNCOV
1295
      if (!pRequest->validateOnly) {
×
1296
        if (NULL == pQuery->pRoot) {
×
1297
          terrno = TSDB_CODE_INVALID_PARA;
×
1298
          code = terrno;
×
1299
        } else {
1300
          code = execLocalCmd(pRequest, pQuery);
×
1301
        }
1302
      }
UNCOV
1303
      break;
×
1304
    case QUERY_EXEC_MODE_RPC:
1,130✔
1305
      if (!pRequest->validateOnly) {
1,130✔
1306
        code = execDdlQuery(pRequest, pQuery);
1,129✔
1307
      }
1308
      break;
1,135✔
1309
    case QUERY_EXEC_MODE_SCHEDULE: {
57,125✔
1310
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
57,125✔
1311
      if (NULL == pMnodeList) {
57,091!
UNCOV
1312
        code = terrno;
×
UNCOV
1313
        break;
×
1314
      }
1315
      SQueryPlan* pDag = NULL;
57,091✔
1316
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
57,091✔
1317
      if (TSDB_CODE_SUCCESS == code) {
57,093!
1318
        pRequest->body.subplanNum = pDag->numOfSubplans;
57,101✔
1319
        if (!pRequest->validateOnly) {
57,101✔
1320
          SArray* pNodeList = NULL;
57,090✔
1321
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
57,090✔
1322
          if (TSDB_CODE_SUCCESS == code) {
57,104!
1323
            code = scheduleQuery(pRequest, pDag, pNodeList);
57,116✔
1324
          }
1325
          taosArrayDestroy(pNodeList);
57,086✔
1326
        }
1327
      }
1328
      taosArrayDestroy(pMnodeList);
57,136✔
1329
      break;
57,134✔
1330
    }
UNCOV
1331
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
UNCOV
1332
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1333
      break;
×
1334
    default:
×
1335
      break;
×
1336
  }
1337

1338
  if (!keepQuery) {
58,269!
UNCOV
1339
    qDestroyQuery(pQuery);
×
1340
  }
1341

1342
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
58,269!
1343
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
205!
1344
    if (TSDB_CODE_SUCCESS != ret) {
205!
UNCOV
1345
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1346
               pRequest->requestId);
1347
    }
1348
  }
1349

1350
  if (TSDB_CODE_SUCCESS == code) {
58,269✔
1351
    code = handleQueryExecRsp(pRequest);
58,254✔
1352
  }
1353

1354
  if (TSDB_CODE_SUCCESS != code) {
58,279✔
1355
    pRequest->code = code;
137✔
1356
  }
1357

1358
  if (res) {
58,279!
UNCOV
1359
    *res = pRequest->body.resInfo.execRes.res;
×
UNCOV
1360
    pRequest->body.resInfo.execRes.res = NULL;
×
1361
  }
1362
}
58,279✔
1363

1364
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
10,747,978✔
1365
                                 SSqlCallbackWrapper* pWrapper) {
1366
  int32_t code = TSDB_CODE_SUCCESS;
10,747,978✔
1367
  pRequest->type = pQuery->msgType;
10,747,978✔
1368
  SArray*     pMnodeList = NULL;
10,747,978✔
1369
  SQueryPlan* pDag = NULL;
10,747,978✔
1370
  int64_t     st = taosGetTimestampUs();
10,744,762✔
1371

1372
  if (!pRequest->parseOnly) {
10,744,762!
1373
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
10,750,377✔
1374
    if (NULL == pMnodeList) {
10,744,708!
UNCOV
1375
      code = terrno;
×
1376
    }
1377
    SPlanContext cxt = {.queryId = pRequest->requestId,
21,518,300✔
1378
                        .acctId = pRequest->pTscObj->acctId,
10,744,708✔
1379
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
10,744,708✔
1380
                        .pAstRoot = pQuery->pRoot,
10,773,592✔
1381
                        .showRewrite = pQuery->showRewrite,
10,773,592✔
1382
                        .isView = pWrapper->pParseCtx->isView,
10,773,592✔
1383
                        .isAudit = pWrapper->pParseCtx->isAudit,
10,773,592✔
1384
                        .pMsg = pRequest->msgBuf,
10,773,592✔
1385
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1386
                        .pUser = pRequest->pTscObj->user,
10,773,592✔
1387
                        .sysInfo = pRequest->pTscObj->sysInfo,
10,773,592✔
1388
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
10,773,592✔
1389
                        .allocatorId = pRequest->allocatorRefId};
10,773,592✔
1390
    if (TSDB_CODE_SUCCESS == code) {
10,773,592✔
1391
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
10,773,079✔
1392
    }
1393
    if (code) {
10,725,773✔
1394
      tscError("req:0x%" PRIx64 ", failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
674!
1395
               pRequest->requestId);
1396
    } else {
1397
      pRequest->body.subplanNum = pDag->numOfSubplans;
10,725,099✔
1398
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
10,725,099✔
1399
    }
1400
  }
1401

1402
  pRequest->metric.execStart = taosGetTimestampUs();
10,740,349✔
1403
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
10,740,349✔
1404

1405
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
21,474,917!
1406
    SArray* pNodeList = NULL;
10,749,040✔
1407
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
10,749,040✔
1408
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
942,308✔
1409
    }
1410

1411
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
10,749,067✔
1412
                             .requestId = pRequest->requestId,
10,715,560✔
1413
                             .requestObjRefId = pRequest->self};
10,715,560✔
1414
    SSchedulerReq    req = {
21,419,228✔
1415
           .syncReq = false,
1416
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
10,715,560✔
1417
           .pConn = &conn,
1418
           .pNodeList = pNodeList,
1419
           .pDag = pDag,
1420
           .allocatorRefId = pRequest->allocatorRefId,
10,715,560✔
1421
           .sql = pRequest->sqlstr,
10,715,560✔
1422
           .startTs = pRequest->metric.start,
10,715,560✔
1423
           .execFp = schedulerExecCb,
1424
           .cbParam = pWrapper,
1425
           .chkKillFp = chkRequestKilled,
1426
           .chkKillParam = (void*)pRequest->self,
10,715,560✔
1427
           .pExecRes = NULL,
1428
           .source = pRequest->source,
10,715,560✔
1429
           .pWorkerCb = getTaskPoolWorkerCb(),
10,715,560✔
1430
    };
1431
    if (TSDB_CODE_SUCCESS == code) {
10,703,668!
1432
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
10,710,690✔
1433
    }
1434

1435
    taosArrayDestroy(pNodeList);
10,732,574✔
1436
  } else {
UNCOV
1437
    qDestroyQueryPlan(pDag);
×
1438
    tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
904✔
1439
             pRequest->requestId);
1440
    destorySqlCallbackWrapper(pWrapper);
904✔
1441
    pRequest->pWrapper = NULL;
904✔
1442
    if (TSDB_CODE_SUCCESS != code) {
904✔
1443
      pRequest->code = terrno;
674✔
1444
    }
1445

1446
    doRequestCallback(pRequest, code);
904✔
1447
  }
1448

1449
  // todo not to be released here
1450
  taosArrayDestroy(pMnodeList);
10,735,472✔
1451

1452
  return code;
10,737,720✔
1453
}
1454

1455
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
10,867,376✔
1456
  int32_t code = 0;
10,867,376✔
1457

1458
  if (pRequest->parseOnly) {
10,867,376✔
1459
    doRequestCallback(pRequest, 0);
626✔
1460
    return;
626✔
1461
  }
1462

1463
  pRequest->body.execMode = pQuery->execMode;
10,866,750✔
1464
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
10,866,750✔
1465
    destorySqlCallbackWrapper(pWrapper);
173,235✔
1466
    pRequest->pWrapper = NULL;
173,199✔
1467
  }
1468

1469
  if (pQuery->pRoot && !pRequest->inRetry) {
10,866,714!
1470
    STscObj*            pTscObj = pRequest->pTscObj;
10,913,026✔
1471
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
10,913,026✔
1472
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
10,913,026✔
1473
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
9,811,764✔
1474
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
9,671,424✔
1475
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
1,241,602✔
1476
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
843,308✔
1477
    }
1478
  }
1479

1480
  switch (pQuery->execMode) {
10,944,834!
1481
    case QUERY_EXEC_MODE_LOCAL:
130,461✔
1482
      asyncExecLocalCmd(pRequest, pQuery);
130,461✔
1483
      break;
130,461✔
1484
    case QUERY_EXEC_MODE_RPC:
40,064✔
1485
      code = asyncExecDdlQuery(pRequest, pQuery);
40,064✔
1486
      break;
40,096✔
1487
    case QUERY_EXEC_MODE_SCHEDULE: {
10,771,592✔
1488
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
10,771,592✔
1489
      break;
10,731,574✔
1490
    }
1491
    case QUERY_EXEC_MODE_EMPTY_RESULT:
2,717✔
1492
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
2,717✔
1493
      doRequestCallback(pRequest, 0);
2,717✔
1494
      break;
2,717✔
UNCOV
1495
    default:
×
UNCOV
1496
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1497
      doRequestCallback(pRequest, -1);
×
1498
      break;
×
1499
  }
1500
}
1501

1502
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
36✔
1503
  SCatalog* pCatalog = NULL;
36✔
1504
  int32_t   code = 0;
36✔
1505
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
36✔
1506
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
36✔
1507

1508
  if (dbNum <= 0 && tblNum <= 0) {
36!
1509
    return TSDB_CODE_APP_ERROR;
36✔
1510
  }
1511

UNCOV
1512
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
UNCOV
1513
  if (code != TSDB_CODE_SUCCESS) {
×
1514
    return code;
×
1515
  }
1516

UNCOV
1517
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
UNCOV
1518
                           .requestId = pRequest->requestId,
×
1519
                           .requestObjRefId = pRequest->self,
×
1520
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1521

1522
  for (int32_t i = 0; i < dbNum; ++i) {
×
UNCOV
1523
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1524

1525
    // catalogRefreshDBVgInfo will handle dbFName == null.
UNCOV
1526
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
UNCOV
1527
    if (code != TSDB_CODE_SUCCESS) {
×
1528
      return code;
×
1529
    }
1530
  }
1531

UNCOV
1532
  for (int32_t i = 0; i < tblNum; ++i) {
×
UNCOV
1533
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1534

1535
    // catalogRefreshTableMeta will handle tableName == null.
UNCOV
1536
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
UNCOV
1537
    if (code != TSDB_CODE_SUCCESS) {
×
1538
      return code;
×
1539
    }
1540
  }
1541

UNCOV
1542
  return code;
×
1543
}
1544

1545
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
28,313✔
1546
  SCatalog* pCatalog = NULL;
28,313✔
1547
  int32_t   tbNum = taosArrayGetSize(tbList);
28,313✔
1548
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
28,312✔
1549
  if (code != TSDB_CODE_SUCCESS) {
28,312!
UNCOV
1550
    return code;
×
1551
  }
1552

1553
  if (isView) {
28,312✔
1554
    for (int32_t i = 0; i < tbNum; ++i) {
848✔
1555
      SName* pViewName = taosArrayGet(tbList, i);
424✔
1556
      char   dbFName[TSDB_DB_FNAME_LEN];
1557
      if (NULL == pViewName) {
424!
UNCOV
1558
        continue;
×
1559
      }
1560
      (void)tNameGetFullDbName(pViewName, dbFName);
424✔
1561
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
424!
1562
    }
1563
  } else {
1564
    for (int32_t i = 0; i < tbNum; ++i) {
45,935✔
1565
      SName* pTbName = taosArrayGet(tbList, i);
18,047✔
1566
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
18,047!
1567
    }
1568
  }
1569

1570
  return TSDB_CODE_SUCCESS;
28,312✔
1571
}
1572

1573
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
32,412✔
1574
  pEpSet->version = 0;
32,412✔
1575

1576
  // init mnode ip set
1577
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
32,412✔
1578
  mgmtEpSet->numOfEps = 0;
32,412✔
1579
  mgmtEpSet->inUse = 0;
32,412✔
1580

1581
  if (firstEp && firstEp[0] != 0) {
32,412!
1582
    if (strlen(firstEp) >= TSDB_EP_LEN) {
32,673!
UNCOV
1583
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1584
      return -1;
×
1585
    }
1586

1587
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
32,673✔
1588
    if (code != TSDB_CODE_SUCCESS) {
32,626!
UNCOV
1589
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1590
      return terrno;
×
1591
    }
1592
    // uint32_t addr = 0;
1593
    SIpAddr addr = {0};
32,626✔
1594
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
32,626✔
1595
    if (code) {
32,572!
UNCOV
1596
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1597
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1598
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
4✔
1599
    } else {
1600
      mgmtEpSet->numOfEps++;
32,610✔
1601
    }
1602
  }
1603

1604
  if (secondEp && secondEp[0] != 0) {
32,353!
1605
    if (strlen(secondEp) >= TSDB_EP_LEN) {
20,936!
UNCOV
1606
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
UNCOV
1607
      return terrno;
×
1608
    }
1609

1610
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
20,936✔
1611
    if (code != TSDB_CODE_SUCCESS) {
20,937!
UNCOV
1612
      return code;
×
1613
    }
1614
    SIpAddr addr = {0};
20,937✔
1615
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
20,937✔
1616
    if (code) {
20,933!
UNCOV
1617
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1618
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1619
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1620
    } else {
1621
      mgmtEpSet->numOfEps++;
20,933✔
1622
    }
1623
  }
1624

1625
  if (mgmtEpSet->numOfEps == 0) {
32,350✔
1626
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1627
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1628
  }
1629

1630
  return 0;
32,346✔
1631
}
1632

1633
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
32,725✔
1634
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1635
  *pTscObj = NULL;
32,725✔
1636
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
32,725✔
1637
  if (TSDB_CODE_SUCCESS != code) {
32,725!
UNCOV
1638
    return code;
×
1639
  }
1640

1641
  SRequestObj* pRequest = NULL;
32,725✔
1642
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
32,725✔
1643
  if (TSDB_CODE_SUCCESS != code) {
32,704!
UNCOV
1644
    destroyTscObj(*pTscObj);
×
UNCOV
1645
    return code;
×
1646
  }
1647

1648
  pRequest->sqlstr = taosStrdup("taos_connect");
32,704!
1649
  if (pRequest->sqlstr) {
32,716!
1650
    pRequest->sqlLen = strlen(pRequest->sqlstr);
32,716✔
1651
  } else {
UNCOV
1652
    return terrno;
×
1653
  }
1654

1655
  SMsgSendInfo* body = NULL;
32,716✔
1656
  code = buildConnectMsg(pRequest, &body);
32,716✔
1657
  if (TSDB_CODE_SUCCESS != code) {
32,652!
UNCOV
1658
    destroyTscObj(*pTscObj);
×
UNCOV
1659
    return code;
×
1660
  }
1661

1662
  // int64_t transporterId = 0;
1663
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
32,652✔
1664
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
32,700✔
1665
  if (TSDB_CODE_SUCCESS != code) {
32,719!
UNCOV
1666
    destroyTscObj(*pTscObj);
×
UNCOV
1667
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
UNCOV
1668
    return code;
×
1669
  }
1670
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
32,719✔
1671
    destroyTscObj(*pTscObj);
1✔
UNCOV
1672
    tscError("failed to wait sem, code:%s", terrstr());
×
UNCOV
1673
    return terrno;
×
1674
  }
1675
  if (pRequest->code != TSDB_CODE_SUCCESS) {
32,723✔
1676
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
57!
1677
    tscError("failed to connect to server, reason: %s", errorMsg);
57!
1678

1679
    terrno = pRequest->code;
57✔
1680
    destroyRequest(pRequest);
57✔
1681
    taos_close_internal(*pTscObj);
57✔
1682
    *pTscObj = NULL;
57✔
1683
    return terrno;
57✔
1684
  } else {
1685
    tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
32,666!
1686
            (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1687
    destroyRequest(pRequest);
32,671✔
1688
  }
1689
  return code;
32,659✔
1690
}
1691

1692
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
32,692✔
1693
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
32,692!
1694
  if (*pMsgSendInfo == NULL) {
32,705!
UNCOV
1695
    return terrno;
×
1696
  }
1697

1698
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
32,705✔
1699

1700
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
32,705✔
1701
  (*pMsgSendInfo)->requestId = pRequest->requestId;
32,705✔
1702
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
32,705✔
1703
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
32,697!
1704
  if (NULL == (*pMsgSendInfo)->param) {
32,693!
UNCOV
1705
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1706
    return terrno;
×
1707
  }
1708

1709
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
32,693✔
1710

1711
  SConnectReq connectReq = {0};
32,693✔
1712
  STscObj*    pObj = pRequest->pTscObj;
32,693✔
1713

1714
  char* db = getDbOfConnection(pObj);
32,693✔
1715
  if (db != NULL) {
32,723✔
1716
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
1,546✔
1717
  } else if (terrno) {
31,177!
UNCOV
1718
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1719
    return terrno;
×
1720
  }
1721
  taosMemoryFreeClear(db);
32,712!
1722

1723
  connectReq.connType = pObj->connType;
32,712✔
1724
  connectReq.pid = appInfo.pid;
32,712✔
1725
  connectReq.startTime = appInfo.startTime;
32,712✔
1726

1727
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
32,712✔
1728
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
32,712✔
1729
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
32,712✔
1730
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
32,712✔
1731

1732
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
32,712✔
1733
  void*   pReq = taosMemoryMalloc(contLen);
32,660!
1734
  if (NULL == pReq) {
32,700!
UNCOV
1735
    taosMemoryFree(*pMsgSendInfo);
×
UNCOV
1736
    return terrno;
×
1737
  }
1738

1739
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
32,700✔
1740
    taosMemoryFree(*pMsgSendInfo);
23!
UNCOV
1741
    taosMemoryFree(pReq);
×
UNCOV
1742
    return terrno;
×
1743
  }
1744

1745
  (*pMsgSendInfo)->msgInfo.len = contLen;
32,658✔
1746
  (*pMsgSendInfo)->msgInfo.pData = pReq;
32,658✔
1747
  return TSDB_CODE_SUCCESS;
32,658✔
1748
}
1749

1750
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
14,603,729✔
1751
  if (NULL == pEpSet) {
14,603,729✔
1752
    return;
13,147,862✔
1753
  }
1754

1755
  switch (pSendInfo->target.type) {
1,455,867✔
1756
    case TARGET_TYPE_MNODE:
23✔
1757
      if (NULL == pTscObj) {
23!
UNCOV
1758
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1759
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1760
        return;
2✔
1761
      }
1762

1763
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
23✔
1764
      SEpSet* pOrig = &originEpset;
23✔
1765
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
23✔
1766
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
23✔
1767
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
23!
1768
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1769
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
23✔
1770
      break;
1,456,936✔
1771
    case TARGET_TYPE_VNODE: {
812,332✔
1772
      if (NULL == pTscObj) {
812,332✔
1773
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
2!
1774
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1775
        return;
2✔
1776
      }
1777

1778
      SCatalog* pCatalog = NULL;
812,330✔
1779
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
812,330✔
1780
      if (code != TSDB_CODE_SUCCESS) {
812,326!
UNCOV
1781
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1782
                 tstrerror(code));
UNCOV
1783
        return;
×
1784
      }
1785

1786
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
812,326✔
1787
      if (code != TSDB_CODE_SUCCESS) {
812,336!
UNCOV
1788
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1789
                 tstrerror(code));
UNCOV
1790
        return;
×
1791
      }
1792
      taosMemoryFreeClear(pSendInfo->target.dbFName);
812,337!
1793
      break;
812,331✔
1794
    }
1795
    default:
643,512✔
1796
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
643,512!
1797
      break;
644,582✔
1798
  }
1799
}
1800

1801
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
14,613,543✔
1802
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
14,613,543✔
1803
  if (pMsg->info.ahandle == NULL) {
14,613,543✔
1804
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
556!
1805
    rpcFreeCont(pMsg->pCont);
556✔
1806
    taosMemoryFree(pEpSet);
556!
1807
    return TSDB_CODE_TSC_INTERNAL_ERROR;
556✔
1808
  }
1809

1810
  STscObj* pTscObj = NULL;
14,612,987✔
1811

1812
  STraceId* trace = &pMsg->info.traceId;
14,612,987✔
1813
  char      tbuf[40] = {0};
14,612,987✔
1814
  TRACE_TO_STR(trace, tbuf);
14,612,987!
1815

1816
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
14,610,224!
1817
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1818

1819
  if (pSendInfo->requestObjRefId != 0) {
14,610,399✔
1820
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
13,116,429✔
1821
    if (pRequest) {
13,114,463✔
1822
      if (pRequest->self != pSendInfo->requestObjRefId) {
13,111,883!
UNCOV
1823
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
1824
                 pSendInfo->requestObjRefId);
1825

UNCOV
1826
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1827
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1828
        }
UNCOV
1829
        rpcFreeCont(pMsg->pCont);
×
1830
        taosMemoryFree(pEpSet);
×
1831
        destroySendMsgInfo(pSendInfo);
×
UNCOV
1832
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1833
      }
1834
      pTscObj = pRequest->pTscObj;
13,111,883✔
1835
    }
1836
  }
1837

1838
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
14,608,433✔
1839

1840
  SDataBuf buf = {.msgType = pMsg->msgType,
14,603,194✔
1841
                  .len = pMsg->contLen,
14,603,194✔
1842
                  .pData = NULL,
1843
                  .handle = pMsg->info.handle,
14,603,194✔
1844
                  .handleRefId = pMsg->info.refId,
14,603,194✔
1845
                  .pEpSet = pEpSet};
1846

1847
  if (pMsg->contLen > 0) {
14,603,194✔
1848
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
14,486,744!
1849
    if (buf.pData == NULL) {
14,487,917!
UNCOV
1850
      pMsg->code = terrno;
×
1851
    } else {
1852
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
14,487,917✔
1853
    }
1854
  }
1855

1856
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
14,604,367✔
1857

1858
  if (pTscObj) {
14,591,939✔
1859
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
13,097,548✔
1860
    if (TSDB_CODE_SUCCESS != code) {
13,109,595!
UNCOV
1861
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
UNCOV
1862
      terrno = code;
×
UNCOV
1863
      pMsg->code = code;
×
1864
    }
1865
  }
1866

1867
  rpcFreeCont(pMsg->pCont);
14,603,986✔
1868
  destroySendMsgInfo(pSendInfo);
14,610,705✔
1869
  return TSDB_CODE_SUCCESS;
14,612,674✔
1870
}
1871

1872
int32_t doProcessMsgFromServer(void* param) {
14,615,129✔
1873
  AsyncArg* arg = (AsyncArg*)param;
14,615,129✔
1874
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
14,615,129✔
1875
  taosMemoryFree(arg);
14,610,716!
1876
  return code;
14,613,686✔
1877
}
1878

1879
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
14,587,088✔
1880
  int32_t code = 0;
14,587,088✔
1881
  SEpSet* tEpSet = NULL;
14,587,088✔
1882

1883
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
14,587,088✔
1884

1885
  if (pEpSet != NULL) {
14,588,161✔
1886
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
1,456,938!
1887
    if (NULL == tEpSet) {
1,456,932!
UNCOV
1888
      code = terrno;
×
UNCOV
1889
      pMsg->code = terrno;
×
UNCOV
1890
      goto _exit;
×
1891
    }
1892
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
1,456,932✔
1893
  }
1894

1895
  // pMsg is response msg
1896
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
14,588,155✔
1897
    // restore origin code
1898
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
32,710!
UNCOV
1899
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1900
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
32,710!
UNCOV
1901
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1902
    }
1903
  } else {
1904
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1905
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
14,555,445!
UNCOV
1906
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1907
    }
1908
  }
1909

1910
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
14,588,155!
1911
  if (NULL == arg) {
14,591,558!
UNCOV
1912
    code = terrno;
×
UNCOV
1913
    pMsg->code = code;
×
UNCOV
1914
    goto _exit;
×
1915
  }
1916

1917
  arg->msg = *pMsg;
14,591,558✔
1918
  arg->pEpset = tEpSet;
14,591,558✔
1919

1920
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
14,591,558!
UNCOV
1921
    pMsg->code = code;
×
UNCOV
1922
    taosMemoryFree(arg);
×
UNCOV
1923
    goto _exit;
×
1924
  }
1925
  return;
14,607,874✔
1926

1927
_exit:
×
UNCOV
1928
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
UNCOV
1929
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
UNCOV
1930
  if (code != 0) {
×
1931
    tscError("failed to sched msg to tsc, tsc ready quit");
×
1932
  }
1933
}
1934

1935
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
3✔
1936
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
3!
1937
  if (user == NULL) {
3!
UNCOV
1938
    user = TSDB_DEFAULT_USER;
×
1939
  }
1940

1941
  if (auth == NULL) {
3!
1942
    tscError("No auth info is given, failed to connect to server");
×
UNCOV
1943
    return NULL;
×
1944
  }
1945

1946
  STscObj* pObj = NULL;
3✔
1947
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
3✔
1948
  if (TSDB_CODE_SUCCESS == code) {
3✔
1949
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1!
1950
    if (NULL == rid) {
1!
UNCOV
1951
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
1952
    }
1953
    *rid = pObj->id;
1✔
1954
    return (TAOS*)rid;
1✔
1955
  }
1956

1957
  return NULL;
2✔
1958
}
1959

1960
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
1961
//                      const char* db, int dbLen, uint16_t port) {
1962
//   char ipStr[TSDB_EP_LEN] = {0};
1963
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
1964
//   char userStr[TSDB_USER_LEN] = {0};
1965
//   char passStr[TSDB_PASSWORD_LEN] = {0};
1966
//
1967
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
1968
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
1969
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
1970
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1971
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
1972
// }
1973

1974
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
35,538,914✔
1975
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
177,099,963✔
1976
    SResultColumn* pCol = &pResultInfo->pCol[i];
141,547,471✔
1977

1978
    int32_t type = pResultInfo->fields[i].type;
141,547,471✔
1979
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
141,547,471✔
1980

1981
    if (IS_VAR_DATA_TYPE(type)) {
141,561,049✔
1982
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
27,511,882✔
1983
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
26,140,777✔
1984

1985
        pResultInfo->length[i] = varDataLen(pStart);
26,140,777✔
1986
        pResultInfo->row[i] = varDataVal(pStart);
26,140,777✔
1987
      } else {
1988
        pResultInfo->row[i] = NULL;
1,371,105✔
1989
        pResultInfo->length[i] = 0;
1,371,105✔
1990
      }
1991
    } else {
1992
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
114,049,167✔
1993
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
108,437,446✔
1994
        pResultInfo->length[i] = schemaBytes;
108,437,446✔
1995
      } else {
1996
        pResultInfo->row[i] = NULL;
5,611,721✔
1997
        pResultInfo->length[i] = 0;
5,611,721✔
1998
      }
1999
    }
2000
  }
2001
}
35,552,492✔
2002

UNCOV
2003
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
UNCOV
2004
  if (pRequest == NULL) {
×
UNCOV
2005
    return NULL;
×
2006
  }
2007

2008
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2009
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2010
    // All data has returned to App already, no need to try again
UNCOV
2011
    if (pResultInfo->completed) {
×
2012
      pResultInfo->numOfRows = 0;
×
2013
      return NULL;
×
2014
    }
2015

2016
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2017
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2018

UNCOV
2019
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2020
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2021
      pResultInfo->numOfRows = 0;
×
UNCOV
2022
      return NULL;
×
2023
    }
2024

2025
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2026
                                           convertUcs4, pRequest->isStmtBind);
×
UNCOV
2027
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
UNCOV
2028
      pResultInfo->numOfRows = 0;
×
2029
      return NULL;
×
2030
    }
2031

2032
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2033
             ", complete:%d, QID:0x%" PRIx64,
2034
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2035

2036
    STscObj*            pTscObj = pRequest->pTscObj;
×
UNCOV
2037
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
UNCOV
2038
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2039

2040
    if (pResultInfo->numOfRows == 0) {
×
2041
      return NULL;
×
2042
    }
2043
  }
2044

2045
  if (setupOneRowPtr) {
×
UNCOV
2046
    doSetOneRowPtr(pResultInfo);
×
UNCOV
2047
    pResultInfo->current += 1;
×
2048
  }
2049

2050
  return pResultInfo->row;
×
2051
}
2052

2053
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
965,802✔
2054
  tsem_t* sem = param;
965,802✔
2055
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
965,802!
UNCOV
2056
    tscError("failed to post sem, code:%s", terrstr());
×
2057
  }
2058
}
965,801✔
2059

2060
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
20,646,747✔
2061
  if (pRequest == NULL) {
20,646,747!
UNCOV
2062
    return NULL;
×
2063
  }
2064

2065
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
20,646,747✔
2066
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
20,646,747✔
2067
    // All data has returned to App already, no need to try again
2068
    if (pResultInfo->completed) {
1,674,269✔
2069
      pResultInfo->numOfRows = 0;
708,476✔
2070
      return NULL;
708,476✔
2071
    }
2072

2073
    // convert ucs4 to native multi-bytes string
2074
    pResultInfo->convertUcs4 = convertUcs4;
965,793✔
2075
    tsem_t sem;
2076
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
965,793!
UNCOV
2077
      tscError("failed to init sem, code:%s", terrstr());
×
2078
    }
2079
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
965,796✔
2080
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
965,803!
2081
      tscError("failed to wait sem, code:%s", terrstr());
×
2082
    }
2083
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
965,793!
UNCOV
2084
      tscError("failed to destroy sem, code:%s", terrstr());
×
2085
    }
2086
    pRequest->inCallback = false;
965,790✔
2087
  }
2088

2089
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
19,938,268!
2090
    return NULL;
62,369✔
2091
  } else {
2092
    if (setupOneRowPtr) {
19,875,899✔
2093
      doSetOneRowPtr(pResultInfo);
18,998,238✔
2094
      pResultInfo->current += 1;
18,994,205✔
2095
    }
2096

2097
    return pResultInfo->row;
19,871,866✔
2098
  }
2099
}
2100

2101
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
1,441,147✔
2102
  if (pResInfo->row == NULL) {
1,441,147✔
2103
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,217,542!
2104
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
1,217,572!
2105
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
1,217,584!
2106
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,217,583!
2107

2108
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
1,217,589!
2109
      taosMemoryFree(pResInfo->row);
10!
UNCOV
2110
      taosMemoryFree(pResInfo->pCol);
×
UNCOV
2111
      taosMemoryFree(pResInfo->length);
×
UNCOV
2112
      taosMemoryFree(pResInfo->convertBuf);
×
UNCOV
2113
      return terrno;
×
2114
    }
2115
  }
2116

2117
  return TSDB_CODE_SUCCESS;
1,441,184✔
2118
}
2119

2120
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
1,440,936✔
2121
  int32_t idx = -1;
1,440,936✔
2122
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
1,440,936✔
2123
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
1,440,975!
2124

2125
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,864,769✔
2126
    int32_t type = pResultInfo->fields[i].type;
5,423,768✔
2127
    int32_t schemaBytes =
2128
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
5,423,768✔
2129

2130
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
5,423,766✔
2131
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
251,194!
2132
      if (p == NULL) {
251,195!
UNCOV
2133
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
UNCOV
2134
        return terrno;
×
2135
      }
2136

2137
      pResultInfo->convertBuf[i] = p;
251,195✔
2138

2139
      SResultColumn* pCol = &pResultInfo->pCol[i];
251,195✔
2140
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
39,761,553✔
2141
        if (pCol->offset[j] != -1) {
39,510,331✔
2142
          char* pStart = pCol->offset[j] + pCol->pData;
34,032,395✔
2143

2144
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
34,032,395✔
2145
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
34,032,428!
2146
            tscError(
6!
2147
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2148
                "colLength[i]):%p",
2149
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2150
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
6✔
UNCOV
2151
            return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2152
          }
2153

2154
          varDataSetLen(p, len);
34,032,422✔
2155
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
34,032,422✔
2156
          p += (len + VARSTR_HEADER_SIZE);
34,032,422✔
2157
        }
2158
      }
2159

2160
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
251,222✔
2161
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
251,222✔
2162
    }
2163
  }
2164
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
1,441,001✔
2165
  return TSDB_CODE_SUCCESS;
1,440,976✔
2166
}
2167

2168
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
1,440,974✔
2169
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,865,029✔
2170
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
5,424,055✔
2171
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
5,424,055✔
2172
    int32_t       type = pFieldE->type;
5,424,055✔
2173
    int32_t       bufLen = 0;
5,424,055✔
2174
    char*         p = NULL;
5,424,055✔
2175
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
5,424,055!
2176
      continue;
5,416,062✔
2177
    } else {
2178
      bufLen = 64;
7,993✔
2179
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
7,993!
2180
      pFieldE->bytes = bufLen;
7,993✔
2181
      pField->bytes = bufLen;
7,993✔
2182
    }
2183
    if (!p) return terrno;
7,993!
2184
    pResultInfo->convertBuf[i] = p;
7,993✔
2185

2186
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
6,463,798✔
2187
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
6,455,805✔
2188
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
6,455,805✔
2189
      p += bufLen;
6,455,805✔
2190
      if (TSDB_CODE_SUCCESS != code) {
6,455,805!
UNCOV
2191
        return code;
×
2192
      }
2193
    }
2194
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
7,993✔
2195
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
7,993✔
2196
  }
2197
  return 0;
1,440,974✔
2198
}
2199

2200
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1,264✔
2201
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
1,264✔
2202
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
2203
}
2204

2205
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
632✔
2206
  char*   p = (char*)pResultInfo->pData;
632✔
2207
  int32_t blockVersion = *(int32_t*)p;
632✔
2208

2209
  int32_t numOfRows = pResultInfo->numOfRows;
632✔
2210
  int32_t numOfCols = pResultInfo->numOfCols;
632✔
2211

2212
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2213
  // length |
2214
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
632✔
2215
  if (numOfCols != cols) {
632!
UNCOV
2216
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2217
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2218
  }
2219

2220
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
632✔
2221
  int32_t* colLength = (int32_t*)(p + len);
632✔
2222
  len += sizeof(int32_t) * numOfCols;
632✔
2223

2224
  char* pStart = p + len;
632✔
2225
  for (int32_t i = 0; i < numOfCols; ++i) {
2,883✔
2226
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,251!
2227

2228
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,251✔
2229
      int32_t* offset = (int32_t*)pStart;
709✔
2230
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
709✔
2231
      len += lenTmp;
709✔
2232
      pStart += lenTmp;
709✔
2233

2234
      int32_t estimateColLen = 0;
709✔
2235
      for (int32_t j = 0; j < numOfRows; ++j) {
3,604✔
2236
        if (offset[j] == -1) {
2,895✔
2237
          continue;
220✔
2238
        }
2239
        char* data = offset[j] + pStart;
2,675✔
2240

2241
        int32_t jsonInnerType = *data;
2,675✔
2242
        char*   jsonInnerData = data + CHAR_BYTES;
2,675✔
2243
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
2,675✔
2244
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
72✔
2245
        } else if (tTagIsJson(data)) {
2,603✔
2246
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
841✔
2247
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,762✔
2248
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
1,466✔
2249
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2250
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
216✔
2251
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2252
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
80✔
2253
        } else {
UNCOV
2254
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
UNCOV
2255
          return -1;
×
2256
        }
2257
      }
2258
      len += TMAX(colLen, estimateColLen);
709✔
2259
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,542!
2260
      int32_t lenTmp = numOfRows * sizeof(int32_t);
396✔
2261
      len += (lenTmp + colLen);
396✔
2262
      pStart += lenTmp;
396✔
2263
    } else {
2264
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
1,146✔
2265
      len += (lenTmp + colLen);
1,146✔
2266
      pStart += lenTmp;
1,146✔
2267
    }
2268
    pStart += colLen;
2,251✔
2269
  }
2270

2271
  // Ensure the complete structure of the block, including the blankfill field,
2272
  // even though it is not used on the client side.
2273
  len += sizeof(bool);
632✔
2274
  return len;
632✔
2275
}
2276

2277
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
1,441,185✔
2278
  int32_t numOfRows = pResultInfo->numOfRows;
1,441,185✔
2279
  int32_t numOfCols = pResultInfo->numOfCols;
1,441,185✔
2280
  bool    needConvert = false;
1,441,185✔
2281
  for (int32_t i = 0; i < numOfCols; ++i) {
6,865,661✔
2282
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
5,425,108✔
2283
      needConvert = true;
632✔
2284
      break;
632✔
2285
    }
2286
  }
2287

2288
  if (!needConvert) {
1,441,185✔
2289
    return TSDB_CODE_SUCCESS;
1,440,561✔
2290
  }
2291

2292
  tscDebug("start to convert form json format string");
624✔
2293

2294
  char*   p = (char*)pResultInfo->pData;
624✔
2295
  int32_t blockVersion = *(int32_t*)p;
624✔
2296
  int32_t dataLen = estimateJsonLen(pResultInfo);
624✔
2297
  if (dataLen <= 0) {
632!
UNCOV
2298
    tscError("doConvertJson error: estimateJsonLen failed");
×
UNCOV
2299
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2300
  }
2301

2302
  taosMemoryFreeClear(pResultInfo->convertJson);
632!
2303
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
632!
2304
  if (pResultInfo->convertJson == NULL) return terrno;
632!
2305
  char* p1 = pResultInfo->convertJson;
632✔
2306

2307
  int32_t totalLen = 0;
632✔
2308
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
632✔
2309
  if (numOfCols != cols) {
632!
UNCOV
2310
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
UNCOV
2311
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2312
  }
2313

2314
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
632✔
2315
  (void)memcpy(p1, p, len);
632✔
2316

2317
  p += len;
632✔
2318
  p1 += len;
632✔
2319
  totalLen += len;
632✔
2320

2321
  len = sizeof(int32_t) * numOfCols;
632✔
2322
  int32_t* colLength = (int32_t*)p;
632✔
2323
  int32_t* colLength1 = (int32_t*)p1;
632✔
2324
  (void)memcpy(p1, p, len);
632✔
2325
  p += len;
632✔
2326
  p1 += len;
632✔
2327
  totalLen += len;
632✔
2328

2329
  char* pStart = p;
632✔
2330
  char* pStart1 = p1;
632✔
2331
  for (int32_t i = 0; i < numOfCols; ++i) {
2,883✔
2332
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,251!
2333
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
2,251!
2334
    if (colLen >= dataLen) {
2,251!
UNCOV
2335
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
UNCOV
2336
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2337
    }
2338
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,251✔
2339
      int32_t* offset = (int32_t*)pStart;
709✔
2340
      int32_t* offset1 = (int32_t*)pStart1;
709✔
2341
      len = numOfRows * sizeof(int32_t);
709✔
2342
      (void)memcpy(pStart1, pStart, len);
709✔
2343
      pStart += len;
709✔
2344
      pStart1 += len;
709✔
2345
      totalLen += len;
709✔
2346

2347
      len = 0;
709✔
2348
      for (int32_t j = 0; j < numOfRows; ++j) {
3,604✔
2349
        if (offset[j] == -1) {
2,895✔
2350
          continue;
220✔
2351
        }
2352
        char* data = offset[j] + pStart;
2,675✔
2353

2354
        int32_t jsonInnerType = *data;
2,675✔
2355
        char*   jsonInnerData = data + CHAR_BYTES;
2,675✔
2356
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
2,675✔
2357
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
2,675✔
2358
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
72✔
2359
          varDataSetLen(dst, strlen(varDataVal(dst)));
72✔
2360
        } else if (tTagIsJson(data)) {
2,603✔
2361
          char* jsonString = NULL;
841✔
2362
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
841✔
2363
          if (jsonString == NULL) {
841!
UNCOV
2364
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
UNCOV
2365
            return terrno;
×
2366
          }
2367
          STR_TO_VARSTR(dst, jsonString);
841✔
2368
          taosMemoryFree(jsonString);
841!
2369
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
1,762✔
2370
          *(char*)varDataVal(dst) = '\"';
1,466✔
2371
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
1,466✔
2372
                                         varDataVal(dst) + CHAR_BYTES, pResultInfo->charsetCxt);
2373
          if (length <= 0) {
1,466✔
2374
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
4!
2375
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2376
            length = 0;
4✔
2377
          }
2378
          varDataSetLen(dst, length + CHAR_BYTES * 2);
1,466✔
2379
          *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
1,466✔
2380
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2381
          double jsonVd = *(double*)(jsonInnerData);
216✔
2382
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
216✔
2383
          varDataSetLen(dst, strlen(varDataVal(dst)));
216✔
2384
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2385
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
80✔
2386
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
80✔
2387
          varDataSetLen(dst, strlen(varDataVal(dst)));
80✔
2388
        } else {
UNCOV
2389
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
UNCOV
2390
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2391
        }
2392

2393
        offset1[j] = len;
2,675✔
2394
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
2,675✔
2395
        len += varDataTLen(dst);
2,675✔
2396
      }
2397
      colLen1 = len;
709✔
2398
      totalLen += colLen1;
709✔
2399
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
709!
2400
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,542!
2401
      len = numOfRows * sizeof(int32_t);
396✔
2402
      (void)memcpy(pStart1, pStart, len);
396✔
2403
      pStart += len;
396✔
2404
      pStart1 += len;
396✔
2405
      totalLen += len;
396✔
2406
      totalLen += colLen;
396✔
2407
      (void)memcpy(pStart1, pStart, colLen);
396✔
2408
    } else {
2409
      len = BitmapLen(pResultInfo->numOfRows);
1,146✔
2410
      (void)memcpy(pStart1, pStart, len);
1,146✔
2411
      pStart += len;
1,146✔
2412
      pStart1 += len;
1,146✔
2413
      totalLen += len;
1,146✔
2414
      totalLen += colLen;
1,146✔
2415
      (void)memcpy(pStart1, pStart, colLen);
1,146✔
2416
    }
2417
    pStart += colLen;
2,251✔
2418
    pStart1 += colLen1;
2,251✔
2419
  }
2420

2421
  // Ensure the complete structure of the block, including the blankfill field,
2422
  // even though it is not used on the client side.
2423
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2424
  totalLen += sizeof(bool);
632✔
2425

2426
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
632✔
2427
  pResultInfo->pData = pResultInfo->convertJson;
632✔
2428
  return TSDB_CODE_SUCCESS;
632✔
2429
}
2430

2431
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
1,509,782✔
2432
  bool convertForDecimal = convertUcs4;
1,509,782✔
2433
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
1,509,782!
UNCOV
2434
    tscError("setResultDataPtr paras error");
×
UNCOV
2435
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2436
  }
2437

2438
  if (pResultInfo->numOfRows == 0) {
1,509,799✔
2439
    return TSDB_CODE_SUCCESS;
68,637✔
2440
  }
2441

2442
  if (pResultInfo->pData == NULL) {
1,441,162!
UNCOV
2443
    tscError("setResultDataPtr error: pData is NULL");
×
UNCOV
2444
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2445
  }
2446

2447
  int32_t code = doPrepareResPtr(pResultInfo);
1,441,162✔
2448
  if (code != TSDB_CODE_SUCCESS) {
1,441,186!
UNCOV
2449
    return code;
×
2450
  }
2451
  code = doConvertJson(pResultInfo);
1,441,186✔
2452
  if (code != TSDB_CODE_SUCCESS) {
1,441,182!
2453
    return code;
×
2454
  }
2455

2456
  char* p = (char*)pResultInfo->pData;
1,441,182✔
2457

2458
  // version:
2459
  int32_t blockVersion = *(int32_t*)p;
1,441,182✔
2460
  p += sizeof(int32_t);
1,441,182✔
2461

2462
  int32_t dataLen = *(int32_t*)p;
1,441,182✔
2463
  p += sizeof(int32_t);
1,441,182✔
2464

2465
  int32_t rows = *(int32_t*)p;
1,441,182✔
2466
  p += sizeof(int32_t);
1,441,182✔
2467

2468
  int32_t cols = *(int32_t*)p;
1,441,182✔
2469
  p += sizeof(int32_t);
1,441,182✔
2470

2471
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
1,441,182!
2472
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
1!
2473
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
UNCOV
2474
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2475
  }
2476

2477
  int32_t hasColumnSeg = *(int32_t*)p;
1,441,181✔
2478
  p += sizeof(int32_t);
1,441,181✔
2479

2480
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
1,441,181✔
2481
  p += sizeof(uint64_t);
1,441,181✔
2482

2483
  // check fields
2484
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,866,259✔
2485
    int8_t type = *(int8_t*)p;
5,425,075✔
2486
    p += sizeof(int8_t);
5,425,075✔
2487

2488
    int32_t bytes = *(int32_t*)p;
5,425,075✔
2489
    p += sizeof(int32_t);
5,425,075✔
2490

2491
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
5,425,075!
UNCOV
2492
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
×
2493
    }
2494
  }
2495

2496
  int32_t* colLength = (int32_t*)p;
1,441,184✔
2497
  p += sizeof(int32_t) * pResultInfo->numOfCols;
1,441,184✔
2498

2499
  char* pStart = p;
1,441,184✔
2500
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,865,772✔
2501
    if ((pStart - pResultInfo->pData) >= dataLen) {
5,424,629!
UNCOV
2502
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
UNCOV
2503
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2504
    }
2505
    if (blockVersion == BLOCK_VERSION_1) {
5,424,629✔
2506
      colLength[i] = htonl(colLength[i]);
3,612,383✔
2507
    }
2508
    if (colLength[i] >= dataLen) {
5,424,629!
UNCOV
2509
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
UNCOV
2510
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2511
    }
2512
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
5,424,629!
2513
      tscError("invalid type %d", pResultInfo->fields[i].type);
25!
2514
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2515
    }
2516
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
5,424,604✔
2517
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
1,333,719✔
2518
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
1,333,719✔
2519
    } else {
2520
      pResultInfo->pCol[i].nullbitmap = pStart;
4,090,885✔
2521
      pStart += BitmapLen(pResultInfo->numOfRows);
4,090,885✔
2522
    }
2523

2524
    pResultInfo->pCol[i].pData = pStart;
5,424,604✔
2525
    pResultInfo->length[i] =
10,849,192✔
2526
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
5,424,604✔
2527
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
5,424,588✔
2528

2529
    pStart += colLength[i];
5,424,588✔
2530
  }
2531

2532
  p = pStart;
1,441,143✔
2533
  // bool blankFill = *(bool*)p;
2534
  p += sizeof(bool);
1,441,143✔
2535
  int32_t offset = p - pResultInfo->pData;
1,441,143✔
2536
  if (offset > dataLen) {
1,441,143!
UNCOV
2537
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
UNCOV
2538
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2539
  }
2540

2541
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2542
  if (convertUcs4) {
1,441,143✔
2543
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
1,440,950✔
2544
  }
2545
#endif
2546
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
1,441,174!
2547
    code = convertDecimalType(pResultInfo);
1,440,985✔
2548
  }
2549
  return code;
1,441,154✔
2550
}
2551

2552
char* getDbOfConnection(STscObj* pObj) {
11,191,358✔
2553
  terrno = TSDB_CODE_SUCCESS;
11,191,358✔
2554
  char* p = NULL;
11,191,628✔
2555
  (void)taosThreadMutexLock(&pObj->mutex);
11,191,628✔
2556
  size_t len = strlen(pObj->db);
11,199,346✔
2557
  if (len > 0) {
11,199,346✔
2558
    p = taosStrndup(pObj->db, tListLen(pObj->db));
10,645,621!
2559
    if (p == NULL) {
10,639,433!
UNCOV
2560
      tscError("failed to taosStrndup db name");
×
2561
    }
2562
  }
2563

2564
  (void)taosThreadMutexUnlock(&pObj->mutex);
11,193,158✔
2565
  return p;
11,199,576✔
2566
}
2567

2568
void setConnectionDB(STscObj* pTscObj, const char* db) {
8,241✔
2569
  if (db == NULL || pTscObj == NULL) {
8,241!
UNCOV
2570
    tscError("setConnectionDB para is NULL");
×
UNCOV
2571
    return;
×
2572
  }
2573

2574
  (void)taosThreadMutexLock(&pTscObj->mutex);
8,244✔
2575
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
8,247✔
2576
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
8,247✔
2577
}
2578

UNCOV
2579
void resetConnectDB(STscObj* pTscObj) {
×
UNCOV
2580
  if (pTscObj == NULL) {
×
UNCOV
2581
    return;
×
2582
  }
2583

2584
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2585
  pTscObj->db[0] = 0;
×
UNCOV
2586
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2587
}
2588

2589
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
1,181,055✔
2590
                              bool isStmt) {
2591
  if (pResultInfo == NULL || pRsp == NULL) {
1,181,055!
UNCOV
2592
    tscError("setQueryResultFromRsp paras is null");
×
UNCOV
2593
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2594
  }
2595

2596
  taosMemoryFreeClear(pResultInfo->pRspMsg);
1,181,072!
2597
  pResultInfo->pRspMsg = (const char*)pRsp;
1,181,072✔
2598
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
1,181,072✔
2599
  pResultInfo->current = 0;
1,181,057✔
2600
  pResultInfo->completed = (pRsp->completed == 1);
1,181,057✔
2601
  pResultInfo->precision = pRsp->precision;
1,181,057✔
2602

2603
  // decompress data if needed
2604
  int32_t payloadLen = htonl(pRsp->payloadLen);
1,181,057✔
2605

2606
  if (pRsp->compressed) {
1,181,057✔
2607
    if (pResultInfo->decompBuf == NULL) {
649✔
2608
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
14!
2609
      if (pResultInfo->decompBuf == NULL) {
14!
UNCOV
2610
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2611
        return terrno;
×
2612
      }
2613
      pResultInfo->decompBufSize = payloadLen;
14✔
2614
    } else {
2615
      if (pResultInfo->decompBufSize < payloadLen) {
635✔
2616
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
24!
2617
        if (p == NULL) {
24!
UNCOV
2618
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
UNCOV
2619
          return terrno;
×
2620
        }
2621

2622
        pResultInfo->decompBuf = p;
24✔
2623
        pResultInfo->decompBufSize = payloadLen;
24✔
2624
      }
2625
    }
2626
  }
2627

2628
  if (payloadLen > 0) {
1,181,057✔
2629
    int32_t compLen = *(int32_t*)pRsp->data;
1,112,421✔
2630
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
1,112,421✔
2631

2632
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
1,112,421✔
2633

2634
    if (pRsp->compressed && compLen < rawLen) {
1,112,421!
2635
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
649✔
2636
      if (len < 0) {
649!
UNCOV
2637
        tscError("tsDecompressString failed");
×
UNCOV
2638
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2639
      }
2640
      if (len != rawLen) {
649!
2641
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2642
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2643
      }
2644
      pResultInfo->pData = pResultInfo->decompBuf;
649✔
2645
      pResultInfo->payloadLen = rawLen;
649✔
2646
    } else {
2647
      pResultInfo->pData = pStart;
1,111,772✔
2648
      pResultInfo->payloadLen = htonl(pRsp->compLen);
1,111,772✔
2649
      if (pRsp->compLen != pRsp->payloadLen) {
1,111,772!
UNCOV
2650
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
UNCOV
2651
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2652
      }
2653
    }
2654
  }
2655

2656
  // TODO handle the compressed case
2657
  pResultInfo->totalRows += pResultInfo->numOfRows;
1,181,057✔
2658

2659
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
1,181,057✔
2660
  return code;
1,181,069✔
2661
}
2662

2663
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
4✔
2664
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
4✔
2665
  void*              clientRpc = NULL;
4✔
2666
  SServerStatusRsp   statusRsp = {0};
4✔
2667
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
4✔
2668
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
4✔
2669
  SRpcMsg  rpcRsp = {0};
4✔
2670
  SRpcInit rpcInit = {0};
4✔
2671
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
4✔
2672

2673
  rpcInit.label = "CHK";
4✔
2674
  rpcInit.numOfThreads = 1;
4✔
2675
  rpcInit.cfp = NULL;
4✔
2676
  rpcInit.sessions = 16;
4✔
2677
  rpcInit.connType = TAOS_CONN_CLIENT;
4✔
2678
  rpcInit.idleTime = tsShellActivityTimer * 1000;
4✔
2679
  rpcInit.compressSize = tsCompressMsgSize;
4✔
2680
  rpcInit.user = "_dnd";
4✔
2681

2682
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
4✔
2683
  connLimitNum = TMAX(connLimitNum, 10);
4✔
2684
  connLimitNum = TMIN(connLimitNum, 500);
4✔
2685
  rpcInit.connLimitNum = connLimitNum;
4✔
2686
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
4✔
2687
  rpcInit.readTimeout = tsReadTimeout;
4✔
2688
  rpcInit.ipv6 = tsEnableIpv6;
4✔
2689
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
4!
UNCOV
2690
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
UNCOV
2691
    goto _OVER;
×
2692
  }
2693

2694
  clientRpc = rpcOpen(&rpcInit);
4✔
2695
  if (clientRpc == NULL) {
4!
UNCOV
2696
    code = terrno;
×
UNCOV
2697
    tscError("failed to init server status client since %s", tstrerror(code));
×
UNCOV
2698
    goto _OVER;
×
2699
  }
2700

2701
  if (fqdn == NULL) {
4!
2702
    fqdn = tsLocalFqdn;
4✔
2703
  }
2704

2705
  if (port == 0) {
4!
2706
    port = tsServerPort;
4✔
2707
  }
2708

2709
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
4✔
2710
  epSet.eps[0].port = (uint16_t)port;
4✔
2711
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
4✔
2712
  if (TSDB_CODE_SUCCESS != ret) {
4!
UNCOV
2713
    tscError("failed to send recv since %s", tstrerror(ret));
×
UNCOV
2714
    goto _OVER;
×
2715
  }
2716

2717
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
4!
2718
    tscError("failed to send server status req since %s", terrstr());
1!
2719
    goto _OVER;
1✔
2720
  }
2721

2722
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
3!
UNCOV
2723
    tscError("failed to parse server status rsp since %s", terrstr());
×
UNCOV
2724
    goto _OVER;
×
2725
  }
2726

2727
  code = statusRsp.statusCode;
3✔
2728
  if (details != NULL) {
3!
2729
    tstrncpy(details, statusRsp.details, maxlen);
3✔
2730
  }
2731

UNCOV
2732
_OVER:
×
2733
  if (clientRpc != NULL) {
4!
2734
    rpcClose(clientRpc);
4✔
2735
  }
2736
  if (rpcRsp.pCont != NULL) {
4✔
2737
    rpcFreeCont(rpcRsp.pCont);
3✔
2738
  }
2739
  return code;
4✔
2740
}
2741

2742
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
4✔
2743
                      int32_t acctId, char* db) {
2744
  SName name = {0};
4✔
2745

2746
  if (len1 <= 0) {
4!
UNCOV
2747
    return -1;
×
2748
  }
2749

2750
  const char* dbName = db;
4✔
2751
  const char* tbName = NULL;
4✔
2752
  int32_t     dbLen = 0;
4✔
2753
  int32_t     tbLen = 0;
4✔
2754
  if (len2 > 0) {
4!
UNCOV
2755
    dbName = str + pos1;
×
UNCOV
2756
    dbLen = len1;
×
UNCOV
2757
    tbName = str + pos2;
×
UNCOV
2758
    tbLen = len2;
×
2759
  } else {
2760
    dbLen = strlen(db);
4✔
2761
    tbName = str + pos1;
4✔
2762
    tbLen = len1;
4✔
2763
  }
2764

2765
  if (dbLen <= 0 || tbLen <= 0) {
4!
UNCOV
2766
    return -1;
×
2767
  }
2768

2769
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
4!
2770
    return -1;
×
2771
  }
2772

2773
  if (tNameAddTbName(&name, tbName, tbLen)) {
4!
2774
    return -1;
×
2775
  }
2776

2777
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
4✔
2778
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
4✔
2779

2780
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
4✔
2781
  if (pDb) {
4!
UNCOV
2782
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
UNCOV
2783
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2784
    }
2785
  } else {
2786
    STablesReq db;
2787
    db.pTables = taosArrayInit(20, sizeof(SName));
4✔
2788
    if (NULL == db.pTables) {
4!
UNCOV
2789
      return terrno;
×
2790
    }
2791
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
4✔
2792
    if (NULL == taosArrayPush(db.pTables, &name)) {
8!
2793
      return terrno;
×
2794
    }
2795
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
4!
2796
  }
2797

2798
  return TSDB_CODE_SUCCESS;
4✔
2799
}
2800

2801
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
4✔
2802
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
4✔
2803
  if (NULL == pHash) {
4!
UNCOV
2804
    return terrno;
×
2805
  }
2806

2807
  bool    inEscape = false;
4✔
2808
  int32_t code = 0;
4✔
2809
  void*   pIter = NULL;
4✔
2810

2811
  int32_t vIdx = 0;
4✔
2812
  int32_t vPos[2];
2813
  int32_t vLen[2];
2814

2815
  (void)memset(vPos, -1, sizeof(vPos));
4✔
2816
  (void)memset(vLen, 0, sizeof(vLen));
4✔
2817

2818
  for (int32_t i = 0;; ++i) {
18✔
2819
    if (0 == *(tbList + i)) {
18✔
2820
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
4!
2821
        vLen[vIdx] = i - vPos[vIdx];
4✔
2822
      }
2823

2824
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
4✔
2825
      if (code) {
4!
UNCOV
2826
        goto _return;
×
2827
      }
2828

2829
      break;
4✔
2830
    }
2831

2832
    if ('`' == *(tbList + i)) {
14!
UNCOV
2833
      inEscape = !inEscape;
×
UNCOV
2834
      if (!inEscape) {
×
UNCOV
2835
        if (vPos[vIdx] >= 0) {
×
UNCOV
2836
          vLen[vIdx] = i - vPos[vIdx];
×
2837
        } else {
2838
          goto _return;
×
2839
        }
2840
      }
2841

2842
      continue;
×
2843
    }
2844

2845
    if (inEscape) {
14!
2846
      if (vPos[vIdx] < 0) {
×
UNCOV
2847
        vPos[vIdx] = i;
×
2848
      }
UNCOV
2849
      continue;
×
2850
    }
2851

2852
    if ('.' == *(tbList + i)) {
14!
2853
      if (vPos[vIdx] < 0) {
×
UNCOV
2854
        goto _return;
×
2855
      }
UNCOV
2856
      if (vLen[vIdx] <= 0) {
×
2857
        vLen[vIdx] = i - vPos[vIdx];
×
2858
      }
UNCOV
2859
      vIdx++;
×
2860
      if (vIdx >= 2) {
×
2861
        goto _return;
×
2862
      }
2863
      continue;
×
2864
    }
2865

2866
    if (',' == *(tbList + i)) {
14!
2867
      if (vPos[vIdx] < 0) {
×
UNCOV
2868
        goto _return;
×
2869
      }
UNCOV
2870
      if (vLen[vIdx] <= 0) {
×
2871
        vLen[vIdx] = i - vPos[vIdx];
×
2872
      }
2873

2874
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
2875
      if (code) {
×
UNCOV
2876
        goto _return;
×
2877
      }
2878

2879
      (void)memset(vPos, -1, sizeof(vPos));
×
2880
      (void)memset(vLen, 0, sizeof(vLen));
×
UNCOV
2881
      vIdx = 0;
×
UNCOV
2882
      continue;
×
2883
    }
2884

2885
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
14!
2886
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
UNCOV
2887
        vLen[vIdx] = i - vPos[vIdx];
×
2888
      }
UNCOV
2889
      continue;
×
2890
    }
2891

2892
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
14!
2893
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
1!
2894
      if (vLen[vIdx] > 0) {
14!
UNCOV
2895
        goto _return;
×
2896
      }
2897
      if (vPos[vIdx] < 0) {
14✔
2898
        vPos[vIdx] = i;
4✔
2899
      }
2900
      continue;
14✔
2901
    }
2902

UNCOV
2903
    goto _return;
×
2904
  }
2905

2906
  int32_t dbNum = taosHashGetSize(pHash);
4✔
2907
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
4✔
2908
  if (NULL == pReq) {
4!
UNCOV
2909
    TSC_ERR_JRET(terrno);
×
2910
  }
2911
  pIter = taosHashIterate(pHash, NULL);
4✔
2912
  while (pIter) {
8✔
2913
    STablesReq* pDb = (STablesReq*)pIter;
4✔
2914
    if (NULL == taosArrayPush(*pReq, pDb)) {
8!
UNCOV
2915
      TSC_ERR_JRET(terrno);
×
2916
    }
2917
    pIter = taosHashIterate(pHash, pIter);
4✔
2918
  }
2919

2920
  taosHashCleanup(pHash);
4✔
2921

2922
  return TSDB_CODE_SUCCESS;
4✔
2923

UNCOV
2924
_return:
×
2925

UNCOV
2926
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
2927

2928
  pIter = taosHashIterate(pHash, NULL);
×
UNCOV
2929
  while (pIter) {
×
2930
    STablesReq* pDb = (STablesReq*)pIter;
×
UNCOV
2931
    taosArrayDestroy(pDb->pTables);
×
2932
    pIter = taosHashIterate(pHash, pIter);
×
2933
  }
2934

2935
  taosHashCleanup(pHash);
×
2936

UNCOV
2937
  return terrno;
×
2938
}
2939

2940
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
4✔
2941
  SSyncQueryParam* pParam = param;
4✔
2942
  pParam->pRequest->code = code;
4✔
2943

2944
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
4!
UNCOV
2945
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2946
  }
2947
}
4✔
2948

2949
void syncQueryFn(void* param, void* res, int32_t code) {
11,080,753✔
2950
  SSyncQueryParam* pParam = param;
11,080,753✔
2951
  pParam->pRequest = res;
11,080,753✔
2952

2953
  if (pParam->pRequest) {
11,080,753✔
2954
    pParam->pRequest->code = code;
11,079,580✔
2955
    clientOperateReport(pParam->pRequest);
11,079,580✔
2956
  }
2957

2958
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
11,082,560!
UNCOV
2959
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2960
  }
2961
}
11,089,485✔
2962

2963
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
11,075,044✔
2964
                        int8_t source) {
2965
  if (sql == NULL || NULL == fp) {
11,075,044!
UNCOV
2966
    terrno = TSDB_CODE_INVALID_PARA;
×
UNCOV
2967
    if (fp) {
×
UNCOV
2968
      fp(param, NULL, terrno);
×
2969
    }
2970

2971
    return;
1✔
2972
  }
2973

2974
  size_t sqlLen = strlen(sql);
11,084,485✔
2975
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
11,084,485✔
2976
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, TSDB_MAX_ALLOWED_SQL_LEN);
1!
2977
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1✔
2978
    fp(param, NULL, terrno);
1✔
2979
    return;
1✔
2980
  }
2981

2982
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
11,084,484✔
2983

2984
  SRequestObj* pRequest = NULL;
11,084,484✔
2985
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
11,084,484✔
2986
  if (code != TSDB_CODE_SUCCESS) {
11,074,947!
UNCOV
2987
    terrno = code;
×
UNCOV
2988
    fp(param, NULL, terrno);
×
UNCOV
2989
    return;
×
2990
  }
2991

2992
  pRequest->source = source;
11,074,947✔
2993
  pRequest->body.queryFp = fp;
11,074,947✔
2994
  doAsyncQuery(pRequest, false);
11,074,947✔
2995
}
2996

UNCOV
2997
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
×
2998
                                 int64_t reqid) {
UNCOV
2999
  if (sql == NULL || NULL == fp) {
×
UNCOV
3000
    terrno = TSDB_CODE_INVALID_PARA;
×
3001
    if (fp) {
×
UNCOV
3002
      fp(param, NULL, terrno);
×
3003
    }
3004

3005
    return;
×
3006
  }
3007

UNCOV
3008
  size_t sqlLen = strlen(sql);
×
3009
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
×
UNCOV
3010
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid,
×
3011
             TSDB_MAX_ALLOWED_SQL_LEN);
3012
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3013
    fp(param, NULL, terrno);
×
3014
    return;
×
3015
  }
3016

3017
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
×
3018

UNCOV
3019
  SRequestObj* pRequest = NULL;
×
UNCOV
3020
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
×
3021
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
3022
    terrno = code;
×
3023
    fp(param, NULL, terrno);
×
3024
    return;
×
3025
  }
3026

3027
  pRequest->body.queryFp = fp;
×
3028
  doAsyncQuery(pRequest, false);
×
3029
}
3030

3031
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
11,072,999✔
3032
  if (NULL == taos) {
11,072,999!
UNCOV
3033
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3034
    return NULL;
×
3035
  }
3036

3037
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
11,072,999!
3038
  if (NULL == param) {
11,084,763!
UNCOV
3039
    return NULL;
×
3040
  }
3041
  int32_t code = tsem_init(&param->sem, 0, 0);
11,084,763✔
3042
  if (TSDB_CODE_SUCCESS != code) {
11,083,017!
3043
    taosMemoryFree(param);
×
UNCOV
3044
    return NULL;
×
3045
  }
3046

3047
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
11,083,017✔
3048
  code = tsem_wait(&param->sem);
11,035,677✔
3049
  if (TSDB_CODE_SUCCESS != code) {
11,087,251!
UNCOV
3050
    taosMemoryFree(param);
×
UNCOV
3051
    return NULL;
×
3052
  }
3053
  code = tsem_destroy(&param->sem);
11,087,251✔
3054
  if (TSDB_CODE_SUCCESS != code) {
11,085,616!
3055
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3056
  }
3057

3058
  SRequestObj* pRequest = NULL;
11,086,511✔
3059
  if (param->pRequest != NULL) {
11,086,511✔
3060
    param->pRequest->syncQuery = true;
11,086,510✔
3061
    pRequest = param->pRequest;
11,086,510✔
3062
    param->pRequest->inCallback = false;
11,086,510✔
3063
  }
3064
  taosMemoryFree(param);
11,086,511!
3065

3066
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3067
  //          *(int64_t*)taos, pRequest);
3068

3069
  return pRequest;
11,080,275✔
3070
}
3071

UNCOV
3072
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
×
UNCOV
3073
  if (NULL == taos) {
×
UNCOV
3074
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
3075
    return NULL;
×
3076
  }
3077

3078
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
×
3079
  if (param == NULL) {
×
UNCOV
3080
    return NULL;
×
3081
  }
3082
  int32_t code = tsem_init(&param->sem, 0, 0);
×
3083
  if (TSDB_CODE_SUCCESS != code) {
×
3084
    taosMemoryFree(param);
×
UNCOV
3085
    return NULL;
×
3086
  }
3087

3088
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
×
3089
  code = tsem_wait(&param->sem);
×
UNCOV
3090
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
3091
    taosMemoryFree(param);
×
3092
    return NULL;
×
3093
  }
3094
  SRequestObj* pRequest = NULL;
×
3095
  if (param->pRequest != NULL) {
×
3096
    param->pRequest->syncQuery = true;
×
UNCOV
3097
    pRequest = param->pRequest;
×
3098
  }
3099
  taosMemoryFree(param);
×
3100

3101
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3102
  //   *(int64_t*)taos, pRequest);
3103

UNCOV
3104
  return pRequest;
×
3105
}
3106

3107
static void fetchCallback(void* pResult, void* param, int32_t code) {
1,060,865✔
3108
  SRequestObj* pRequest = (SRequestObj*)param;
1,060,865✔
3109

3110
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,060,865✔
3111

3112
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
1,060,865✔
3113
           tstrerror(code), pRequest->requestId);
3114

3115
  pResultInfo->pData = pResult;
1,060,865✔
3116
  pResultInfo->numOfRows = 0;
1,060,865✔
3117

3118
  if (code != TSDB_CODE_SUCCESS) {
1,060,865!
UNCOV
3119
    pRequest->code = code;
×
UNCOV
3120
    taosMemoryFreeClear(pResultInfo->pData);
×
UNCOV
3121
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
UNCOV
3122
    return;
×
3123
  }
3124

3125
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,060,865!
3126
    taosMemoryFreeClear(pResultInfo->pData);
×
UNCOV
3127
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
UNCOV
3128
    return;
×
3129
  }
3130

3131
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
2,121,745✔
3132
                                         pResultInfo->convertUcs4, pRequest->isStmtBind);
1,060,865✔
3133
  if (pRequest->code != TSDB_CODE_SUCCESS) {
1,060,880!
UNCOV
3134
    pResultInfo->numOfRows = 0;
×
UNCOV
3135
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
×
3136
             tstrerror(pRequest->code), pRequest->requestId);
3137
  } else {
3138
    tscDebug(
1,060,880✔
3139
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3140
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3141

3142
    STscObj*            pTscObj = pRequest->pTscObj;
1,060,881✔
3143
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,060,881✔
3144
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
1,060,881✔
3145
  }
3146

3147
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
1,060,894✔
3148
}
3149

3150
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
1,143,746✔
3151
  pRequest->body.fetchFp = fp;
1,143,746✔
3152
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
1,143,746✔
3153

3154
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,143,746✔
3155

3156
  // this query has no results or error exists, return directly
3157
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,143,746!
3158
    pResultInfo->numOfRows = 0;
2✔
3159
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
2✔
3160
    return;
82,851✔
3161
  }
3162

3163
  // all data has returned to App already, no need to try again
3164
  if (pResultInfo->completed) {
1,143,738✔
3165
    // it is a local executed query, no need to do async fetch
3166
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
82,850✔
3167
      if (pResultInfo->localResultFetched) {
598✔
3168
        pResultInfo->numOfRows = 0;
299✔
3169
        pResultInfo->current = 0;
299✔
3170
      } else {
3171
        pResultInfo->localResultFetched = true;
299✔
3172
      }
3173
    } else {
3174
      pResultInfo->numOfRows = 0;
82,252✔
3175
    }
3176

3177
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
82,850✔
3178
    return;
82,850✔
3179
  }
3180

3181
  SSchedulerReq req = {
1,060,888✔
3182
      .syncReq = false,
3183
      .fetchFp = fetchCallback,
3184
      .cbParam = pRequest,
3185
  };
3186

3187
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
1,060,888✔
3188
  if (TSDB_CODE_SUCCESS != code) {
1,060,890!
UNCOV
3189
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3190
    // pRequest->body.fetchFp(param, pRequest, code);
3191
  }
3192
}
3193

3194
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
11,079,106✔
3195
  pRequest->inCallback = true;
11,079,106✔
3196
  int64_t this = pRequest->self;
11,079,106✔
3197
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
11,079,106!
3198
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
316!
UNCOV
3199
    code = TSDB_CODE_SUCCESS;
×
UNCOV
3200
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3201
  }
3202

3203
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
11,079,106✔
3204
           pRequest);
3205

3206
  if (pRequest->body.queryFp != NULL) {
11,079,109!
3207
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
11,079,461✔
3208
  }
3209

3210
  SRequestObj* pReq = acquireRequest(this);
11,088,517✔
3211
  if (pReq != NULL) {
11,087,570✔
3212
    pReq->inCallback = false;
11,082,316✔
3213
    (void)releaseRequest(this);
11,082,316✔
3214
  }
3215
}
11,083,294✔
3216

3217
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
943✔
3218
                       SParseSqlRes* pRes) {
3219
#ifndef TD_ENTERPRISE
3220
  return TSDB_CODE_SUCCESS;
3221
#else
3222
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
943✔
3223
#endif
3224
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc