• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3552

11 Dec 2024 06:08AM UTC coverage: 62.526% (+0.7%) from 61.798%
#3552

push

travis-ci

web-flow
Merge pull request #29092 from taosdata/fix/3.0/TD-33146

fix:[TD-33146] stmt_get_tag_fields return error code

124833 of 255773 branches covered (48.81%)

Branch coverage included in aggregate %.

209830 of 279467 relevant lines covered (75.08%)

19111707.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.47
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "command.h"
21
#include "scheduler.h"
22
#include "tdatablock.h"
23
#include "tdataformat.h"
24
#include "tdef.h"
25
#include "tglobal.h"
26
#include "tmsgtype.h"
27
#include "tpagedbuf.h"
28
#include "tref.h"
29
#include "tsched.h"
30
#include "tversion.h"
31
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
32
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
33

34
void setQueryRequest(int64_t rId) {
1,524,429✔
35
  SRequestObj* pReq = acquireRequest(rId);
1,524,429✔
36
  if (pReq != NULL) {
1,524,436✔
37
    pReq->isQuery = true;
1,524,420✔
38
    (void)releaseRequest(rId);
1,524,420✔
39
  }
40
}
1,524,428✔
41

42
static bool stringLengthCheck(const char* str, size_t maxsize) {
26,390✔
43
  if (str == NULL) {
26,390!
44
    return false;
×
45
  }
46

47
  size_t len = strlen(str);
26,390✔
48
  if (len <= 0 || len > maxsize) {
26,390!
49
    return false;
×
50
  }
51

52
  return true;
26,396✔
53
}
54

55
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
13,181✔
56

57
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_LEN - 1); }
13,178✔
58

59
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
31✔
60

61
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
13,181✔
62
  char key[512] = {0};
13,181✔
63
  (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
13,181✔
64
  return taosStrdup(key);
13,181✔
65
}
66

67
bool chkRequestKilled(void* param) {
35,100,598✔
68
  bool         killed = false;
35,100,598✔
69
  SRequestObj* pRequest = acquireRequest((int64_t)param);
35,100,598✔
70
  if (NULL == pRequest || pRequest->killed) {
35,759,712!
71
    killed = true;
2✔
72
  }
73

74
  (void)releaseRequest((int64_t)param);
35,759,712✔
75

76
  return killed;
35,647,001✔
77
}
78

79
void cleanupAppInfo() {
×
80
  taosHashCleanup(appInfo.pInstMap);
×
81
  taosHashCleanup(appInfo.pInstMapByClusterId);
×
82
}
×
83

84
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
85
                               SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
86

87
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
13,183✔
88
                              uint16_t port, int connType, STscObj** pObj) {
89
  TSC_ERR_RET(taos_init());
13,183!
90
  if (!validateUserName(user)) {
13,182!
91
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
92
  }
93

94
  char localDb[TSDB_DB_NAME_LEN] = {0};
13,184✔
95
  if (db != NULL && strlen(db) > 0) {
13,184!
96
    if (!validateDbName(db)) {
31!
97
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
98
    }
99

100
    tstrncpy(localDb, db, sizeof(localDb));
31✔
101
    (void)strdequote(localDb);
31✔
102
  }
103

104
  char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
13,181✔
105
  if (auth == NULL) {
13,181✔
106
    if (!validatePassword(pass)) {
13,178!
107
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
108
    }
109

110
    taosEncryptPass_c((uint8_t*)pass, strlen(pass), secretEncrypt);
13,181✔
111
  } else {
112
    tstrncpy(secretEncrypt, auth, tListLen(secretEncrypt));
3✔
113
  }
114

115
  SCorEpSet epSet = {0};
13,181✔
116
  if (ip) {
13,181✔
117
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
9,519✔
118
  } else {
119
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
3,662!
120
  }
121

122
  if (port) {
13,181✔
123
    epSet.epSet.eps[0].port = port;
6,686✔
124
    epSet.epSet.eps[1].port = port;
6,686✔
125
  }
126

127
  char* key = getClusterKey(user, secretEncrypt, ip, port);
13,181✔
128
  if (NULL == key) {
13,181!
129
    TSC_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
×
130
  }
131
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
13,181!
132
          user, db, key);
133
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
30,028✔
134
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
16,847!
135
  }
136

137
  SAppInstInfo** pInst = NULL;
13,181✔
138
  int32_t        code = taosThreadMutexLock(&appInfo.mutex);
13,181✔
139
  if (TSDB_CODE_SUCCESS != code) {
13,181!
140
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
141
    TSC_ERR_RET(code);
×
142
  }
143

144
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
13,181✔
145
  SAppInstInfo* p = NULL;
13,181✔
146
  if (pInst == NULL) {
13,181✔
147
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
4,438✔
148
    if (NULL == p) {
4,438!
149
      TSC_ERR_JRET(terrno);
×
150
    }
151
    p->mgmtEp = epSet;
4,438✔
152
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
4,438✔
153
    if (TSDB_CODE_SUCCESS != code) {
4,438!
154
      taosMemoryFree(p);
×
155
      TSC_ERR_JRET(code);
×
156
    }
157
    code = openTransporter(user, secretEncrypt, tsNumOfCores / 2, &p->pTransporter);
4,438✔
158
    if (TSDB_CODE_SUCCESS != code) {
4,438!
159
      taosMemoryFree(p);
×
160
      TSC_ERR_JRET(code);
×
161
    }
162
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
4,438✔
163
    if (TSDB_CODE_SUCCESS != code) {
4,438!
164
      destroyAppInst(&p);
×
165
      TSC_ERR_JRET(code);
×
166
    }
167
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
4,438✔
168
    if (TSDB_CODE_SUCCESS != code) {
4,438!
169
      destroyAppInst(&p);
×
170
      TSC_ERR_JRET(code);
×
171
    }
172
    p->instKey = key;
4,438✔
173
    key = NULL;
4,438✔
174
    tscDebug("new app inst mgr %p, user:%s, ip:%s, port:%d", p, user, epSet.epSet.eps[0].fqdn, epSet.epSet.eps[0].port);
4,438✔
175

176
    pInst = &p;
4,438✔
177
  } else {
178
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
8,743!
179
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
180
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
181
    }
182
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
183
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
8,743✔
184
  }
185

186
_return:
13,181✔
187

188
  if (TSDB_CODE_SUCCESS != code) {
13,181!
189
    (void)taosThreadMutexUnlock(&appInfo.mutex);
×
190
    taosMemoryFreeClear(key);
×
191
    return code;
×
192
  } else {
193
    code = taosThreadMutexUnlock(&appInfo.mutex);
13,181✔
194
    taosMemoryFreeClear(key);
13,181✔
195
    if (TSDB_CODE_SUCCESS != code) {
13,181!
196
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
197
      return code;
×
198
    }
199
    return taosConnectImpl(user, &secretEncrypt[0], localDb, NULL, NULL, *pInst, connType, pObj);
13,181✔
200
  }
201
}
202

203
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
204
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
205
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
206
//     return *ppAppInstInfo;
207
//   } else {
208
//     return NULL;
209
//   }
210
// }
211

212
void freeQueryParam(SSyncQueryParam* param) {
665✔
213
  if (param == NULL) return;
665!
214
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
665!
215
    tscError("failed to destroy semaphore in freeQueryParam");
×
216
  }
217
  taosMemoryFree(param);
665✔
218
}
219

220
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
10,930,135✔
221
                     SRequestObj** pRequest, int64_t reqid) {
222
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
10,930,135✔
223
  if (TSDB_CODE_SUCCESS != code) {
10,940,478!
224
    tscError("failed to malloc sqlObj, %s", sql);
×
225
    return code;
×
226
  }
227

228
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
10,940,478✔
229
  if ((*pRequest)->sqlstr == NULL) {
10,929,348!
230
    tscError("0x%" PRIx64 " failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
231
    destroyRequest(*pRequest);
×
232
    *pRequest = NULL;
×
233
    return terrno;
×
234
  }
235

236
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
10,929,348✔
237
  (*pRequest)->sqlstr[sqlLen] = 0;
10,935,179✔
238
  (*pRequest)->sqlLen = sqlLen;
10,935,179✔
239
  (*pRequest)->validateOnly = validateSql;
10,935,179✔
240
  (*pRequest)->isStmtBind = false;
10,935,179✔
241

242
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
10,935,179✔
243

244
  STscObj* pTscObj = (*pRequest)->pTscObj;
10,935,179✔
245
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
10,935,179✔
246
                             sizeof((*pRequest)->self));
247
  if (err) {
10,933,528!
248
    tscError("%" PRId64 " failed to add to request container,QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
249
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
250
    destroyRequest(*pRequest);
×
251
    *pRequest = NULL;
×
252
    return terrno;
×
253
  }
254

255
  (*pRequest)->allocatorRefId = -1;
10,933,528✔
256
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
10,933,528✔
257
    if (TSDB_CODE_SUCCESS !=
1,297,058!
258
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
1,297,047✔
259
      tscError("%" PRId64 " failed to create node allocator,QID:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self,
×
260
               (*pRequest)->requestId, pTscObj->id, sql);
261
      destroyRequest(*pRequest);
×
262
      *pRequest = NULL;
×
263
      return terrno;
×
264
    }
265
  }
266

267
  tscDebugL("0x%" PRIx64 " SQL: %s,QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
10,933,720✔
268
  return TSDB_CODE_SUCCESS;
10,929,238✔
269
}
270

271
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
519✔
272
  int32_t code =
273
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
519✔
274
  if (TSDB_CODE_SUCCESS == code) {
519!
275
    pRequest->relation.prevRefId = (*pNewRequest)->self;
519✔
276
    (*pNewRequest)->relation.nextRefId = pRequest->self;
519✔
277
    (*pNewRequest)->relation.userRefId = pRequest->self;
519✔
278
    (*pNewRequest)->isSubReq = true;
519✔
279
  }
280
  return code;
519✔
281
}
282

283
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
51✔
284
  STscObj* pTscObj = pRequest->pTscObj;
51✔
285

286
  SParseContext cxt = {.requestId = pRequest->requestId,
51✔
287
                       .requestRid = pRequest->self,
51✔
288
                       .acctId = pTscObj->acctId,
51✔
289
                       .db = pRequest->pDb,
51✔
290
                       .topicQuery = topicQuery,
291
                       .pSql = pRequest->sqlstr,
51✔
292
                       .sqlLen = pRequest->sqlLen,
51✔
293
                       .pMsg = pRequest->msgBuf,
51✔
294
                       .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
295
                       .pTransporter = pTscObj->pAppInfo->pTransporter,
51✔
296
                       .pStmtCb = pStmtCb,
297
                       .pUser = pTscObj->user,
51✔
298
                       .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
51✔
299
                       .enableSysInfo = pTscObj->sysInfo,
51✔
300
                       .svrVer = pTscObj->sVer,
51✔
301
                       .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
51✔
302
                       .isStmtBind = pRequest->isStmtBind,
51✔
303
                       .setQueryFp = setQueryRequest,
304
                       .timezone = pTscObj->optionInfo.timezone,
51✔
305
                       .charsetCxt = pTscObj->optionInfo.charsetCxt,};
51✔
306

307
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
51✔
308
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
51✔
309
  if (code != TSDB_CODE_SUCCESS) {
51!
310
    return code;
×
311
  }
312

313
  code = qParseSql(&cxt, pQuery);
51✔
314
  if (TSDB_CODE_SUCCESS == code) {
50✔
315
    if ((*pQuery)->haveResultSet) {
47!
316
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols);
×
317
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
×
318
    }
319
  }
320

321
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
50!
322
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
48✔
323
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
48✔
324
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
48✔
325
  }
326

327
  taosArrayDestroy(cxt.pTableMetaPos);
50✔
328
  taosArrayDestroy(cxt.pTableVgroupPos);
50✔
329

330
  return code;
50✔
331
}
332

333
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
334
  SRetrieveTableRsp* pRsp = NULL;
×
335
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
336
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp, biMode, pRequest->pTscObj->optionInfo.charsetCxt);
×
337
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
338
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4);
×
339
  }
340

341
  return code;
×
342
}
343

344
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
965✔
345
  // drop table if exists not_exists_table
346
  if (NULL == pQuery->pCmdMsg) {
965!
347
    return TSDB_CODE_SUCCESS;
×
348
  }
349

350
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
965✔
351
  pRequest->type = pMsgInfo->msgType;
965✔
352
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
965✔
353
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
965✔
354

355
  STscObj*      pTscObj = pRequest->pTscObj;
965✔
356
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
965✔
357

358
  // int64_t transporterId = 0;
359
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
964!
360
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
965!
361
  return TSDB_CODE_SUCCESS;
965✔
362
}
363

364
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
20,970,041✔
365

366
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
120,475✔
367
  SRetrieveTableRsp* pRsp = NULL;
120,475✔
368
  if (pRequest->validateOnly) {
120,475✔
369
    doRequestCallback(pRequest, 0);
18✔
370
    return;
18✔
371
  }
372

373
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, pQuery->pRoot, &pRsp,
120,457✔
374
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
120,457✔
375
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
120,457✔
376
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4);
96,999✔
377
  }
378

379
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
120,457✔
380
  pRequest->code = code;
120,457✔
381

382
  if (pRequest->code != TSDB_CODE_SUCCESS) {
120,457✔
383
    pResultInfo->numOfRows = 0;
13✔
384
    tscError("0x%" PRIx64 " fetch results failed, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(code),
13!
385
             pRequest->requestId);
386
  } else {
387
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
120,444✔
388
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
389
             pRequest->requestId);
390
  }
391

392
  doRequestCallback(pRequest, code);
120,457✔
393
}
394

395
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
40,376✔
396
  if (pRequest->validateOnly) {
40,376!
397
    doRequestCallback(pRequest, 0);
×
398
    return TSDB_CODE_SUCCESS;
×
399
  }
400

401
  // drop table if exists not_exists_table
402
  if (NULL == pQuery->pCmdMsg) {
40,376✔
403
    doRequestCallback(pRequest, 0);
1✔
404
    return TSDB_CODE_SUCCESS;
1✔
405
  }
406

407
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
40,375✔
408
  pRequest->type = pMsgInfo->msgType;
40,375✔
409
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
40,375✔
410
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
40,375✔
411

412
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
40,375✔
413
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
40,373✔
414

415
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
40,373✔
416
  if (code) {
40,376!
417
    doRequestCallback(pRequest, code);
×
418
  }
419
  return code;
40,376✔
420
}
421

422
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
249,194✔
423
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
249,194✔
424
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
249,194✔
425

426
  if (node1->load < node2->load) {
249,194!
427
    return -1;
×
428
  }
429

430
  return node1->load > node2->load;
249,194✔
431
}
432

433
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
85,226✔
434
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
85,226!
435
  if (pInfo->pQnodeList) {
85,226✔
436
    taosArrayDestroy(pInfo->pQnodeList);
84,625✔
437
    pInfo->pQnodeList = NULL;
84,625✔
438
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
84,625✔
439
  }
440

441
  if (pNodeList) {
85,226!
442
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
85,226✔
443
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
85,226✔
444
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
85,226✔
445
             taosArrayGetSize(pInfo->pQnodeList));
446
  }
447
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
85,226!
448

449
  return TSDB_CODE_SUCCESS;
85,226✔
450
}
451

452
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
10,944,041✔
453
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
10,944,041✔
454
    *required = false;
10,297,202✔
455
    return TSDB_CODE_SUCCESS;
10,297,202✔
456
  }
457

458
  int32_t       code = TSDB_CODE_SUCCESS;
646,839✔
459
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
646,839✔
460
  *required = false;
646,839✔
461

462
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
646,839!
463
  *required = (NULL == pInfo->pQnodeList);
646,839✔
464
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
646,839!
465
  return TSDB_CODE_SUCCESS;
646,839✔
466
}
467

468
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
469
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
470
  int32_t       code = 0;
×
471

472
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
473
  if (pInfo->pQnodeList) {
×
474
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
475
  }
476
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
477
  if (NULL == *pNodeList) {
×
478
    SCatalog* pCatalog = NULL;
×
479
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
480
    if (TSDB_CODE_SUCCESS == code) {
×
481
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
482
      if (NULL == pNodeList) {
×
483
        TSC_ERR_RET(terrno);
×
484
      }
485
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
486
                               .requestId = pRequest->requestId,
×
487
                               .requestObjRefId = pRequest->self,
×
488
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
489
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
490
    }
491

492
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
493
      code = updateQnodeList(pInfo, *pNodeList);
×
494
    }
495
  }
496

497
  return code;
×
498
}
499

500
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
1,290✔
501
  pRequest->type = pQuery->msgType;
1,290✔
502
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
1,290✔
503

504
  SPlanContext cxt = {.queryId = pRequest->requestId,
2,583✔
505
                      .acctId = pRequest->pTscObj->acctId,
1,291✔
506
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
1,291✔
507
                      .pAstRoot = pQuery->pRoot,
1,292✔
508
                      .showRewrite = pQuery->showRewrite,
1,292✔
509
                      .pMsg = pRequest->msgBuf,
1,292✔
510
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
511
                      .pUser = pRequest->pTscObj->user,
1,292✔
512
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
1,292✔
513
                      .sysInfo = pRequest->pTscObj->sysInfo};
1,292✔
514

515
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
1,292✔
516
}
517

518
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols) {
1,323,547✔
519
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
1,323,547!
520
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
521
    return TSDB_CODE_INVALID_PARA;
×
522
  }
523

524
  pResInfo->numOfCols = numOfCols;
1,323,569✔
525
  if (pResInfo->fields != NULL) {
1,323,569✔
526
    taosMemoryFree(pResInfo->fields);
35✔
527
  }
528
  if (pResInfo->userFields != NULL) {
1,323,569✔
529
    taosMemoryFree(pResInfo->userFields);
35✔
530
  }
531
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,323,569✔
532
  if (NULL == pResInfo->fields) return terrno;
1,323,569!
533
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
1,323,569✔
534
  if (NULL == pResInfo->userFields) {
1,323,563✔
535
    taosMemoryFree(pResInfo->fields);
20✔
536
    return terrno;
×
537
  }
538
  if (numOfCols != pResInfo->numOfCols) {
1,323,543!
539
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
540
    return TSDB_CODE_FAILED;
×
541
  }
542

543
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
6,472,569✔
544
    pResInfo->fields[i].bytes = pSchema[i].bytes;
5,149,026✔
545
    pResInfo->fields[i].type = pSchema[i].type;
5,149,026✔
546

547
    pResInfo->userFields[i].bytes = pSchema[i].bytes;
5,149,026✔
548
    pResInfo->userFields[i].type = pSchema[i].type;
5,149,026✔
549

550
    if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR || pSchema[i].type == TSDB_DATA_TYPE_VARBINARY ||
5,149,026✔
551
        pSchema[i].type == TSDB_DATA_TYPE_GEOMETRY) {
4,148,404✔
552
      pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
1,000,666✔
553
    } else if (pSchema[i].type == TSDB_DATA_TYPE_NCHAR || pSchema[i].type == TSDB_DATA_TYPE_JSON) {
4,148,360✔
554
      pResInfo->userFields[i].bytes = (pResInfo->userFields[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
277,254✔
555
    }
556

557
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
5,149,026✔
558
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
5,149,026✔
559
  }
560
  return TSDB_CODE_SUCCESS;
1,323,543✔
561
}
562

563
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
967,227✔
564
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
967,227!
565
      precision != TSDB_TIME_PRECISION_NANO) {
566
    return;
×
567
  }
568

569
  pResInfo->precision = precision;
967,227✔
570
}
571

572
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
624,298✔
573
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
624,298✔
574
  if (NULL == nodeList) {
624,323!
575
    return terrno;
×
576
  }
577
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
624,326✔
578

579
  int32_t dbNum = taosArrayGetSize(pDbVgList);
624,326✔
580
  for (int32_t i = 0; i < dbNum; ++i) {
1,243,354✔
581
    SArray* pVg = taosArrayGetP(pDbVgList, i);
619,078✔
582
    if (NULL == pVg) {
619,085!
583
      continue;
×
584
    }
585
    int32_t vgNum = taosArrayGetSize(pVg);
619,085✔
586
    if (vgNum <= 0) {
619,093✔
587
      continue;
612✔
588
    }
589

590
    for (int32_t j = 0; j < vgNum; ++j) {
3,082,925✔
591
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
2,464,506✔
592
      if (NULL == pInfo) {
2,464,460!
593
        taosArrayDestroy(nodeList);
×
594
        return TSDB_CODE_OUT_OF_RANGE;
×
595
      }
596
      SQueryNodeLoad load = {0};
2,464,460✔
597
      load.addr.nodeId = pInfo->vgId;
2,464,460✔
598
      load.addr.epSet = pInfo->epSet;
2,464,460✔
599

600
      if (NULL == taosArrayPush(nodeList, &load)) {
2,464,444!
601
        taosArrayDestroy(nodeList);
×
602
        return terrno;
×
603
      }
604
    }
605
  }
606

607
  int32_t vnodeNum = taosArrayGetSize(nodeList);
624,276✔
608
  if (vnodeNum > 0) {
624,334✔
609
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
617,855✔
610
    goto _return;
617,855✔
611
  }
612

613
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
6,479✔
614
  if (mnodeNum <= 0) {
6,481!
615
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
616
    goto _return;
×
617
  }
618

619
  void* pData = taosArrayGet(pMnodeList, 0);
6,481✔
620
  if (NULL == pData) {
6,481!
621
    taosArrayDestroy(nodeList);
×
622
    return TSDB_CODE_OUT_OF_RANGE;
×
623
  }
624
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
6,481!
625
    taosArrayDestroy(nodeList);
×
626
    return terrno;
×
627
  }
628

629
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
6,481✔
630

631
_return:
1,426✔
632

633
  *pNodeList = nodeList;
624,331✔
634

635
  return TSDB_CODE_SUCCESS;
624,331✔
636
}
637

638
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
298,839✔
639
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
298,839✔
640
  if (NULL == nodeList) {
298,839!
641
    return terrno;
×
642
  }
643

644
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
298,839✔
645
  if (qNodeNum > 0) {
298,839✔
646
    void* pData = taosArrayGet(pQnodeList, 0);
298,833✔
647
    if (NULL == pData) {
298,833!
648
      taosArrayDestroy(nodeList);
×
649
      return TSDB_CODE_OUT_OF_RANGE;
×
650
    }
651
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
298,833!
652
      taosArrayDestroy(nodeList);
×
653
      return terrno;
×
654
    }
655
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
298,833✔
656
    goto _return;
298,833✔
657
  }
658

659
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
6✔
660
  if (mnodeNum <= 0) {
6✔
661
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
4!
662
    goto _return;
4✔
663
  }
664

665
  void* pData = taosArrayGet(pMnodeList, 0);
2✔
666
  if (NULL == pData) {
2!
667
    taosArrayDestroy(nodeList);
×
668
    return TSDB_CODE_OUT_OF_RANGE;
×
669
  }
670
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
2!
671
    taosArrayDestroy(nodeList);
×
672
    return terrno;
×
673
  }
674

675
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
2!
676

677
_return:
×
678

679
  *pNodeList = nodeList;
298,839✔
680

681
  return TSDB_CODE_SUCCESS;
298,839✔
682
}
683

684
void freeVgList(void* list) {
1,056✔
685
  SArray* pList = *(SArray**)list;
1,056✔
686
  taosArrayDestroy(pList);
1,056✔
687
}
1,057✔
688

689
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
921,829✔
690
  SArray* pDbVgList = NULL;
921,829✔
691
  SArray* pQnodeList = NULL;
921,829✔
692
  FDelete fp = NULL;
921,829✔
693
  int32_t code = 0;
921,829✔
694

695
  switch (tsQueryPolicy) {
921,829!
696
    case QUERY_POLICY_VNODE:
623,020✔
697
    case QUERY_POLICY_CLIENT: {
698
      if (pResultMeta) {
623,020!
699
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
623,040✔
700
        if (NULL == pDbVgList) {
623,044!
701
          code = terrno;
×
702
          goto _return;
×
703
        }
704
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
623,044✔
705
        for (int32_t i = 0; i < dbNum; ++i) {
1,241,023✔
706
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
618,025✔
707
          if (pRes->code || NULL == pRes->pRes) {
618,006!
708
            continue;
×
709
          }
710

711
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
1,236,045!
712
            code = terrno;
×
713
            goto _return;
×
714
          }
715
        }
716
      } else {
717
        fp = freeVgList;
×
718

719
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
×
720
        if (dbNum > 0) {
×
721
          SCatalog*     pCtg = NULL;
×
722
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
×
723
          code = catalogGetHandle(pInst->clusterId, &pCtg);
×
724
          if (code != TSDB_CODE_SUCCESS) {
×
725
            goto _return;
×
726
          }
727

728
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
×
729
          if (NULL == pDbVgList) {
×
730
            code = terrno;
×
731
            goto _return;
×
732
          }
733
          SArray* pVgList = NULL;
×
734
          for (int32_t i = 0; i < dbNum; ++i) {
×
735
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
×
736
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
×
737
                                     .requestId = pRequest->requestId,
×
738
                                     .requestObjRefId = pRequest->self,
×
739
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
×
740

741
            // catalogGetDBVgList will handle dbFName == null.
742
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
×
743
            if (code) {
×
744
              goto _return;
×
745
            }
746

747
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
×
748
              code = terrno;
×
749
              goto _return;
×
750
            }
751
          }
752
        }
753
      }
754

755
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
622,998✔
756
      break;
623,038✔
757
    }
758
    case QUERY_POLICY_HYBRID:
298,835✔
759
    case QUERY_POLICY_QNODE: {
760
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
303,928!
761
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
5,093✔
762
        if (pRes->code) {
5,093!
763
          pQnodeList = NULL;
×
764
        } else {
765
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
5,093✔
766
          if (NULL == pQnodeList) {
5,093!
767
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
768
            goto _return;
×
769
          }
770
        }
771
      } else {
772
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
293,744✔
773
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
293,744!
774
        if (pInst->pQnodeList) {
293,746!
775
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
293,746✔
776
          if (NULL == pQnodeList) {
293,746!
777
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
778
            goto _return;
×
779
          }
780
        }
781
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
293,746!
782
      }
783

784
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
298,839✔
785
      break;
298,839✔
786
    }
787
    default:
×
788
      tscError("unknown query policy: %d", tsQueryPolicy);
×
789
      return TSDB_CODE_APP_ERROR;
×
790
  }
791

792
_return:
921,877✔
793
  taosArrayDestroyEx(pDbVgList, fp);
921,877✔
794
  taosArrayDestroy(pQnodeList);
921,885✔
795

796
  return code;
921,890✔
797
}
798

799
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
1,290✔
800
  SArray* pDbVgList = NULL;
1,290✔
801
  SArray* pQnodeList = NULL;
1,290✔
802
  int32_t code = 0;
1,290✔
803

804
  switch (tsQueryPolicy) {
1,290!
805
    case QUERY_POLICY_VNODE:
1,291✔
806
    case QUERY_POLICY_CLIENT: {
807
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
1,291✔
808
      if (dbNum > 0) {
1,292✔
809
        SCatalog*     pCtg = NULL;
1,057✔
810
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,057✔
811
        code = catalogGetHandle(pInst->clusterId, &pCtg);
1,057✔
812
        if (code != TSDB_CODE_SUCCESS) {
1,057!
813
          goto _return;
×
814
        }
815

816
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
1,057✔
817
        if (NULL == pDbVgList) {
1,056!
818
          code = terrno;
×
819
          goto _return;
×
820
        }
821
        SArray* pVgList = NULL;
1,056✔
822
        for (int32_t i = 0; i < dbNum; ++i) {
2,113✔
823
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
1,055✔
824
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
1,057✔
825
                                   .requestId = pRequest->requestId,
1,057✔
826
                                   .requestObjRefId = pRequest->self,
1,057✔
827
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
1,057✔
828

829
          // catalogGetDBVgList will handle dbFName == null.
830
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
1,057✔
831
          if (code) {
1,057!
832
            goto _return;
×
833
          }
834

835
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
1,057!
836
            code = terrno;
×
837
            goto _return;
×
838
          }
839
        }
840
      }
841

842
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
1,293✔
843
      break;
1,291✔
844
    }
845
    case QUERY_POLICY_HYBRID:
×
846
    case QUERY_POLICY_QNODE: {
847
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
848

849
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
850
      break;
×
851
    }
852
    default:
×
853
      tscError("unknown query policy: %d", tsQueryPolicy);
×
854
      return TSDB_CODE_APP_ERROR;
×
855
  }
856

857
_return:
1,291✔
858

859
  taosArrayDestroyEx(pDbVgList, freeVgList);
1,291✔
860
  taosArrayDestroy(pQnodeList);
1,291✔
861

862
  return code;
1,291✔
863
}
864

865
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
1,290✔
866
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
1,290✔
867

868
  SExecResult      res = {0};
1,290✔
869
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
1,290✔
870
                           .requestId = pRequest->requestId,
1,290✔
871
                           .requestObjRefId = pRequest->self};
1,290✔
872
  SSchedulerReq    req = {
2,580✔
873
         .syncReq = true,
874
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
1,290✔
875
         .pConn = &conn,
876
         .pNodeList = pNodeList,
877
         .pDag = pDag,
878
         .sql = pRequest->sqlstr,
1,290✔
879
         .startTs = pRequest->metric.start,
1,290✔
880
         .execFp = NULL,
881
         .cbParam = NULL,
882
         .chkKillFp = chkRequestKilled,
883
         .chkKillParam = (void*)pRequest->self,
1,290✔
884
         .pExecRes = &res,
885
         .source = pRequest->source,
1,290✔
886
         .pWorkerCb = getTaskPoolWorkerCb(),
1,290✔
887
  };
888

889
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
1,290✔
890

891
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
1,291✔
892
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
1,291✔
893

894
  if (code != TSDB_CODE_SUCCESS) {
1,291!
895
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
896

897
    pRequest->code = code;
×
898
    terrno = code;
×
899
    return pRequest->code;
1✔
900
  }
901

902
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
1,291!
903
      TDMT_VND_CREATE_TABLE == pRequest->type) {
170✔
904
    pRequest->body.resInfo.numOfRows = res.numOfRows;
1,252✔
905
    if (TDMT_VND_SUBMIT == pRequest->type) {
1,252✔
906
      STscObj*            pTscObj = pRequest->pTscObj;
1,122✔
907
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,122✔
908
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
1,122✔
909
    }
910

911
    schedulerFreeJob(&pRequest->body.queryJob, 0);
1,252✔
912
  }
913

914
  pRequest->code = res.code;
1,292✔
915
  terrno = res.code;
1,292✔
916
  return pRequest->code;
1,291✔
917
}
918

919
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
9,611,800✔
920
  SArray*      pArray = NULL;
9,611,800✔
921
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
9,611,800✔
922
  if (NULL == pRsp->aCreateTbRsp) {
9,611,800✔
923
    return TSDB_CODE_SUCCESS;
9,560,838✔
924
  }
925

926
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
50,962✔
927
  for (int32_t i = 0; i < tbNum; ++i) {
113,208✔
928
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
59,577✔
929
    if (pTbRsp->pMeta) {
59,578✔
930
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
39,950!
931
    }
932
  }
933

934
  return TSDB_CODE_SUCCESS;
53,631✔
935
}
936

937
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
726,084✔
938
  int32_t code = 0;
726,084✔
939
  SArray* pArray = NULL;
726,084✔
940
  SArray* pTbArray = (SArray*)res;
726,084✔
941
  int32_t tbNum = taosArrayGetSize(pTbArray);
726,084✔
942
  if (tbNum <= 0) {
726,089!
943
    return TSDB_CODE_SUCCESS;
×
944
  }
945

946
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
726,089✔
947
  if (NULL == pArray) {
726,096!
948
    return terrno;
×
949
  }
950

951
  for (int32_t i = 0; i < tbNum; ++i) {
2,043,120✔
952
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
1,317,026✔
953
    if (NULL == tbInfo) {
1,317,023!
954
      code = terrno;
×
955
      goto _return;
×
956
    }
957
    STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
1,317,023✔
958
    if (NULL == taosArrayPush(pArray, &tbSver)) {
1,317,024!
959
      code = terrno;
×
960
      goto _return;
×
961
    }
962
  }
963

964
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
726,094✔
965
                           .requestId = pRequest->requestId,
726,094✔
966
                           .requestObjRefId = pRequest->self,
726,094✔
967
                           .mgmtEps = *epset};
968

969
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
726,094✔
970

971
_return:
726,085✔
972

973
  taosArrayDestroy(pArray);
726,085✔
974
  return code;
726,092✔
975
}
976

977
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
5,392✔
978
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
5,392✔
979
}
980

981
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
138,715✔
982
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
138,715✔
983
}
984

985
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
10,600,713✔
986
  if (NULL == pRequest->body.resInfo.execRes.res) {
10,600,713✔
987
    return pRequest->code;
225,818✔
988
  }
989

990
  SCatalog*     pCatalog = NULL;
10,374,895✔
991
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
10,374,895✔
992

993
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
10,374,609✔
994
  if (code) {
10,369,641!
995
    return code;
×
996
  }
997

998
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
10,369,641✔
999
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
10,405,757✔
1000

1001
  switch (pRes->msgType) {
10,405,757!
1002
    case TDMT_VND_ALTER_TABLE:
1,034✔
1003
    case TDMT_MND_ALTER_STB: {
1004
      code = handleAlterTbExecRes(pRes->res, pCatalog);
1,034✔
1005
      break;
1,034✔
1006
    }
1007
    case TDMT_VND_CREATE_TABLE: {
57,890✔
1008
      SArray* pList = (SArray*)pRes->res;
57,890✔
1009
      int32_t num = taosArrayGetSize(pList);
57,890✔
1010
      for (int32_t i = 0; i < num; ++i) {
149,453✔
1011
        void* res = taosArrayGetP(pList, i);
91,563✔
1012
        // handleCreateTbExecRes will handle res == null
1013
        code = handleCreateTbExecRes(res, pCatalog);
91,560✔
1014
      }
1015
      break;
57,890✔
1016
    }
1017
    case TDMT_MND_CREATE_STB: {
871✔
1018
      code = handleCreateTbExecRes(pRes->res, pCatalog);
871✔
1019
      break;
871✔
1020
    }
1021
    case TDMT_VND_SUBMIT: {
9,620,264✔
1022
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
9,620,264✔
1023

1024
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
9,621,856✔
1025
      break;
9,611,257✔
1026
    }
1027
    case TDMT_SCH_QUERY:
726,094✔
1028
    case TDMT_SCH_MERGE_QUERY: {
1029
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
726,094✔
1030
      break;
726,075✔
1031
    }
1032
    default:
×
1033
      tscError("0x%" PRIx64 ", invalid exec result for request type %d,QID:0x%" PRIx64, pRequest->self, pRequest->type,
×
1034
               pRequest->requestId);
1035
      code = TSDB_CODE_APP_ERROR;
×
1036
  }
1037

1038
  return code;
10,397,127✔
1039
}
1040

1041
static bool incompletaFileParsing(SNode* pStmt) {
10,625,845✔
1042
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
10,625,845✔
1043
}
1044

1045
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
495✔
1046
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
495✔
1047

1048
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
495✔
1049
  if (TSDB_CODE_SUCCESS == code) {
495!
1050
    int64_t analyseStart = taosGetTimestampUs();
495✔
1051
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
495✔
1052
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
495✔
1053
  }
1054

1055
  if (TSDB_CODE_SUCCESS == code) {
495!
1056
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
495✔
1057
  }
1058

1059
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
495✔
1060
  handleQueryAnslyseRes(pWrapper, NULL, code);
495✔
1061
}
495✔
1062

1063
void returnToUser(SRequestObj* pRequest) {
81,061✔
1064
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
81,061!
1065
    // return to client
1066
    doRequestCallback(pRequest, pRequest->code);
81,061✔
1067
    return;
81,061✔
1068
  }
1069

1070
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1071
  if (pUserReq) {
×
1072
    pUserReq->code = pRequest->code;
×
1073
    // return to client
1074
    doRequestCallback(pUserReq, pUserReq->code);
×
1075
    (void)releaseRequest(pRequest->relation.userRefId);
×
1076
    return;
×
1077
  } else {
1078
    tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
×
1079
             pRequest->relation.userRefId, pRequest->requestId);
1080
  }
1081
}
1082

1083
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
495✔
1084
  int64_t     lastTs = 0;
495✔
1085
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
495✔
1086
  int32_t     numOfFields = taos_num_fields(pRes);
495✔
1087

1088
  int32_t code = createDataBlock(pBlock);
495✔
1089
  if (code) {
495!
1090
    return code;
×
1091
  }
1092

1093
  for (int32_t i = 0; i < numOfFields; ++i) {
1,980✔
1094
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
1,485✔
1095
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
1,485✔
1096
    if (TSDB_CODE_SUCCESS != code) {
1,485!
1097
      blockDataDestroy(*pBlock);
×
1098
      return code;
×
1099
    }
1100
  }
1101

1102
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
495✔
1103
  if (TSDB_CODE_SUCCESS != code) {
495!
1104
    blockDataDestroy(*pBlock);
×
1105
    return code;
×
1106
  }
1107

1108
  for (int32_t i = 0; i < numOfRows; ++i) {
1,497✔
1109
    TAOS_ROW pRow = taos_fetch_row(pRes);
1,002✔
1110
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
1,002!
1111
      tscError("invalid data from vnode");
×
1112
      blockDataDestroy(*pBlock);
×
1113
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1114
    }
1115
    int64_t ts = *(int64_t*)pRow[0];
1,002✔
1116
    if (lastTs < ts) {
1,002✔
1117
      lastTs = ts;
566✔
1118
    }
1119

1120
    for (int32_t j = 0; j < numOfFields; ++j) {
4,008✔
1121
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
3,006✔
1122
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
3,006✔
1123
      if (TSDB_CODE_SUCCESS != code) {
3,006!
1124
        blockDataDestroy(*pBlock);
×
1125
        return code;
×
1126
      }
1127
    }
1128

1129
    tscDebug("lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1], *(int64_t*)pRow[2]);
1,002✔
1130
  }
1131

1132
  (*pBlock)->info.window.ekey = lastTs;
495✔
1133
  (*pBlock)->info.rows = numOfRows;
495✔
1134

1135
  tscDebug("lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
495✔
1136
  return TSDB_CODE_SUCCESS;
495✔
1137
}
1138

1139
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
495✔
1140
  SRequestObj* pRequest = (SRequestObj*)res;
495✔
1141
  if (pRequest->code) {
495!
1142
    returnToUser(pRequest);
×
1143
    return;
×
1144
  }
1145

1146
  SSDataBlock* pBlock = NULL;
495✔
1147
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
495✔
1148
  if (TSDB_CODE_SUCCESS != pRequest->code) {
495!
1149
    tscError("0x%" PRIx64 ", create result block failed,QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1150
             tstrerror(pRequest->code));
1151
    returnToUser(pRequest);
×
1152
    return;
×
1153
  }
1154

1155
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
495✔
1156
  if (pNextReq) {
495!
1157
    continuePostSubQuery(pNextReq, pBlock);
495✔
1158
    (void)releaseRequest(pRequest->relation.nextRefId);
495✔
1159
  } else {
1160
    tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
×
1161
             pRequest->relation.nextRefId, pRequest->requestId);
1162
  }
1163

1164
  blockDataDestroy(pBlock);
495✔
1165
}
1166

1167
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
495✔
1168
  SRequestObj* pRequest = pWrapper->pRequest;
495✔
1169
  if (TD_RES_QUERY(pRequest)) {
495!
1170
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
495✔
1171
    return;
495✔
1172
  }
1173

1174
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1175
  if (pNextReq) {
×
1176
    continuePostSubQuery(pNextReq, NULL);
×
1177
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1178
  } else {
1179
    tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
×
1180
             pRequest->relation.nextRefId, pRequest->requestId);
1181
  }
1182
}
1183

1184
// todo refacto the error code  mgmt
1185
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
10,557,214✔
1186
  SSqlCallbackWrapper* pWrapper = param;
10,557,214✔
1187
  SRequestObj*         pRequest = pWrapper->pRequest;
10,557,214✔
1188
  STscObj*             pTscObj = pRequest->pTscObj;
10,557,214✔
1189

1190
  pRequest->code = code;
10,557,214✔
1191
  if (pResult) {
10,557,214!
1192
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
10,560,295✔
1193
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
10,577,663✔
1194
  }
1195

1196
  int32_t type = pRequest->type;
10,574,582✔
1197
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
10,574,582✔
1198
    if (pResult) {
9,687,054!
1199
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
9,694,449✔
1200

1201
      // record the insert rows
1202
      if (TDMT_VND_SUBMIT == type) {
9,694,449✔
1203
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
9,576,782✔
1204
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
9,576,782✔
1205
      }
1206
    }
1207
    schedulerFreeJob(&pRequest->body.queryJob, 0);
9,732,641✔
1208
  }
1209

1210
  taosMemoryFree(pResult);
10,611,468✔
1211
  tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(code),
10,630,327✔
1212
           pRequest->requestId);
1213

1214
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
10,620,022!
1215
    tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d,QID:0x%" PRIx64, pRequest->self,
589✔
1216
             tstrerror(code), pRequest->retry, pRequest->requestId);
1217
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
589!
1218
      tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1219
    }
1220
    restartAsyncQuery(pRequest, code);
589✔
1221
    return;
589✔
1222
  }
1223

1224
  tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
10,619,433!
1225
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
10,619,433!
1226
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
25,342!
1227
      tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1228
    }
1229
  }
1230

1231
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
10,617,642✔
1232
  int32_t code1 = handleQueryExecRsp(pRequest);
10,617,642✔
1233
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
10,619,431!
1234
    pRequest->code = code1;
×
1235
  }
1236

1237
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
21,245,391!
1238
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
10,626,172✔
1239
    continueInsertFromCsv(pWrapper, pRequest);
×
1240
    return;
20✔
1241
  }
1242

1243
  if (pRequest->relation.nextRefId) {
10,627,951✔
1244
    handlePostSubQuery(pWrapper);
495✔
1245
  } else {
1246
    destorySqlCallbackWrapper(pWrapper);
10,627,456✔
1247
    pRequest->pWrapper = NULL;
10,629,881✔
1248

1249
    // return to client
1250
    doRequestCallback(pRequest, code);
10,629,881✔
1251
  }
1252
}
1253

1254
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
2,254✔
1255
  int32_t code = 0;
2,254✔
1256
  int32_t subplanNum = 0;
2,254✔
1257

1258
  if (pQuery->pRoot) {
2,254✔
1259
    pRequest->stmtType = pQuery->pRoot->type;
1,291✔
1260
  }
1261

1262
  if (pQuery->pRoot && !pRequest->inRetry) {
2,254!
1263
    STscObj*            pTscObj = pRequest->pTscObj;
1,292✔
1264
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,292✔
1265
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
1,292✔
1266
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
1,275✔
1267
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
17✔
1268
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
16✔
1269
    }
1270
  }
1271

1272
  pRequest->body.execMode = pQuery->execMode;
2,255✔
1273
  switch (pQuery->execMode) {
2,255!
1274
    case QUERY_EXEC_MODE_LOCAL:
×
1275
      if (!pRequest->validateOnly) {
×
1276
        if (NULL == pQuery->pRoot) {
×
1277
          terrno = TSDB_CODE_INVALID_PARA;
×
1278
          code = terrno;
×
1279
        } else {
1280
          code = execLocalCmd(pRequest, pQuery);
×
1281
        }
1282
      }
1283
      break;
×
1284
    case QUERY_EXEC_MODE_RPC:
965✔
1285
      if (!pRequest->validateOnly) {
965!
1286
        code = execDdlQuery(pRequest, pQuery);
965✔
1287
      }
1288
      break;
965✔
1289
    case QUERY_EXEC_MODE_SCHEDULE: {
1,290✔
1290
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,290✔
1291
      if (NULL == pMnodeList) {
1,291!
1292
        code = terrno;
×
1293
        break;
×
1294
      }
1295
      SQueryPlan* pDag = NULL;
1,291✔
1296
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
1,291✔
1297
      if (TSDB_CODE_SUCCESS == code) {
1,290!
1298
        pRequest->body.subplanNum = pDag->numOfSubplans;
1,292✔
1299
        if (!pRequest->validateOnly) {
1,292✔
1300
          SArray* pNodeList = NULL;
1,291✔
1301
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
1,291✔
1302
          if (TSDB_CODE_SUCCESS == code) {
1,291!
1303
            code = scheduleQuery(pRequest, pDag, pNodeList);
1,291✔
1304
          }
1305
          taosArrayDestroy(pNodeList);
1,291✔
1306
        }
1307
      }
1308
      taosArrayDestroy(pMnodeList);
1,291✔
1309
      break;
1,292✔
1310
    }
1311
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1312
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1313
      break;
×
1314
    default:
×
1315
      break;
×
1316
  }
1317

1318
  if (!keepQuery) {
2,257!
1319
    qDestroyQuery(pQuery);
×
1320
  }
1321

1322
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
2,257!
1323
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
97!
1324
    if (TSDB_CODE_SUCCESS != ret) {
97!
1325
      tscError("0x%" PRIx64 " remove meta failed,code:%d,QID:0x%" PRIx64, pRequest->self, ret, pRequest->requestId);
×
1326
    }
1327
  }
1328

1329
  if (TSDB_CODE_SUCCESS == code) {
2,257✔
1330
    code = handleQueryExecRsp(pRequest);
2,255✔
1331
  }
1332

1333
  if (TSDB_CODE_SUCCESS != code) {
2,257✔
1334
    pRequest->code = code;
37✔
1335
  }
1336

1337
  if (res) {
2,257!
1338
    *res = pRequest->body.resInfo.execRes.res;
×
1339
    pRequest->body.resInfo.execRes.res = NULL;
×
1340
  }
1341
}
2,257✔
1342

1343
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
10,597,823✔
1344
                                 SSqlCallbackWrapper* pWrapper) {
1345
  int32_t code = TSDB_CODE_SUCCESS;
10,597,823✔
1346
  pRequest->type = pQuery->msgType;
10,597,823✔
1347
  SArray*     pMnodeList = NULL;
10,597,823✔
1348
  SQueryPlan* pDag = NULL;
10,597,823✔
1349
  int64_t     st = taosGetTimestampUs();
10,608,308✔
1350

1351
  if (!pRequest->parseOnly) {
10,608,308!
1352
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
10,612,534✔
1353
    if (NULL == pMnodeList) {
10,606,941!
1354
      code = terrno;
×
1355
    }
1356
    SPlanContext cxt = {.queryId = pRequest->requestId,
21,240,895✔
1357
                        .acctId = pRequest->pTscObj->acctId,
10,606,941✔
1358
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
10,606,941✔
1359
                        .pAstRoot = pQuery->pRoot,
10,633,954✔
1360
                        .showRewrite = pQuery->showRewrite,
10,633,954✔
1361
                        .isView = pWrapper->pParseCtx->isView,
10,633,954✔
1362
                        .isAudit = pWrapper->pParseCtx->isAudit,
10,633,954✔
1363
                        .pMsg = pRequest->msgBuf,
10,633,954✔
1364
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1365
                        .pUser = pRequest->pTscObj->user,
10,633,954✔
1366
                        .sysInfo = pRequest->pTscObj->sysInfo,
10,633,954✔
1367
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
10,633,954✔
1368
                        .allocatorId = pRequest->allocatorRefId};
10,633,954✔
1369
    if (TSDB_CODE_SUCCESS == code) {
10,633,954!
1370
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
10,634,568✔
1371
    }
1372
    if (code) {
10,573,454✔
1373
      tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
896!
1374
               pRequest->requestId);
1375
    } else {
1376
      pRequest->body.subplanNum = pDag->numOfSubplans;
10,572,558✔
1377
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
10,572,558✔
1378
    }
1379
  }
1380

1381
  pRequest->metric.execStart = taosGetTimestampUs();
10,591,026✔
1382
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
10,591,026✔
1383

1384
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
21,181,819!
1385
    SArray* pNodeList = NULL;
10,606,486✔
1386
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
10,606,486✔
1387
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
921,877✔
1388
    }
1389

1390
    SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
10,606,486✔
1391
                             .requestId = pRequest->requestId,
10,571,904✔
1392
                             .requestObjRefId = pRequest->self};
10,571,904✔
1393
    SSchedulerReq    req = {
21,130,839✔
1394
           .syncReq = false,
1395
           .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
10,571,904✔
1396
           .pConn = &conn,
1397
           .pNodeList = pNodeList,
1398
           .pDag = pDag,
1399
           .allocatorRefId = pRequest->allocatorRefId,
10,571,904✔
1400
           .sql = pRequest->sqlstr,
10,571,904✔
1401
           .startTs = pRequest->metric.start,
10,571,904✔
1402
           .execFp = schedulerExecCb,
1403
           .cbParam = pWrapper,
1404
           .chkKillFp = chkRequestKilled,
1405
           .chkKillParam = (void*)pRequest->self,
10,571,904✔
1406
           .pExecRes = NULL,
1407
           .source = pRequest->source,
10,571,904✔
1408
           .pWorkerCb = getTaskPoolWorkerCb(),
10,571,904✔
1409
    };
1410
    if (TSDB_CODE_SUCCESS == code) {
10,558,935!
1411
      code = schedulerExecJob(&req, &pRequest->body.queryJob);
10,565,678✔
1412
    }
1413

1414
    taosArrayDestroy(pNodeList);
10,589,903✔
1415
  } else {
1416
    qDestroyQueryPlan(pDag);
×
1417
    tscDebug("0x%" PRIx64 " plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
1,077!
1418
             pRequest->requestId);
1419
    destorySqlCallbackWrapper(pWrapper);
1,077✔
1420
    pRequest->pWrapper = NULL;
1,077✔
1421
    if (TSDB_CODE_SUCCESS != code) {
1,077✔
1422
      pRequest->code = terrno;
896✔
1423
    }
1424

1425
    doRequestCallback(pRequest, code);
1,077✔
1426
  }
1427

1428
  // todo not to be released here
1429
  taosArrayDestroy(pMnodeList);
10,591,870✔
1430

1431
  return code;
10,598,557✔
1432
}
1433

1434
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
10,717,756✔
1435
  int32_t code = 0;
10,717,756✔
1436

1437
  if (pRequest->parseOnly) {
10,717,756✔
1438
    doRequestCallback(pRequest, 0);
416✔
1439
    return;
416✔
1440
  }
1441

1442
  pRequest->body.execMode = pQuery->execMode;
10,717,340✔
1443
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
10,717,340✔
1444
    destorySqlCallbackWrapper(pWrapper);
164,134✔
1445
    pRequest->pWrapper = NULL;
164,135✔
1446
  }
1447

1448
  if (pQuery->pRoot && !pRequest->inRetry) {
10,717,341!
1449
    STscObj*            pTscObj = pRequest->pTscObj;
10,759,471✔
1450
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
10,759,471✔
1451
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
10,759,471✔
1452
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
9,681,288✔
1453
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
9,595,650✔
1454
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
1,163,821✔
1455
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
794,018✔
1456
    }
1457
  }
1458

1459
  switch (pQuery->execMode) {
10,794,934!
1460
    case QUERY_EXEC_MODE_LOCAL:
120,475✔
1461
      asyncExecLocalCmd(pRequest, pQuery);
120,475✔
1462
      break;
120,476✔
1463
    case QUERY_EXEC_MODE_RPC:
40,375✔
1464
      code = asyncExecDdlQuery(pRequest, pQuery);
40,375✔
1465
      break;
40,376✔
1466
    case QUERY_EXEC_MODE_SCHEDULE: {
10,630,802✔
1467
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
10,630,802✔
1468
      break;
10,587,664✔
1469
    }
1470
    case QUERY_EXEC_MODE_EMPTY_RESULT:
3,282✔
1471
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
3,282✔
1472
      doRequestCallback(pRequest, 0);
3,282✔
1473
      break;
3,282✔
1474
    default:
×
1475
      tscError("0x%" PRIx64 " invalid execMode %d", pRequest->self, pQuery->execMode);
×
1476
      doRequestCallback(pRequest, -1);
×
1477
      break;
×
1478
  }
1479
}
1480

1481
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
176✔
1482
  SCatalog* pCatalog = NULL;
176✔
1483
  int32_t   code = 0;
176✔
1484
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
176✔
1485
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
176✔
1486

1487
  if (dbNum <= 0 && tblNum <= 0) {
176!
1488
    return TSDB_CODE_APP_ERROR;
176✔
1489
  }
1490

1491
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
×
1492
  if (code != TSDB_CODE_SUCCESS) {
×
1493
    return code;
×
1494
  }
1495

1496
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
×
1497
                           .requestId = pRequest->requestId,
×
1498
                           .requestObjRefId = pRequest->self,
×
1499
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
×
1500

1501
  for (int32_t i = 0; i < dbNum; ++i) {
×
1502
    char* dbFName = taosArrayGet(pRequest->dbList, i);
×
1503

1504
    // catalogRefreshDBVgInfo will handle dbFName == null.
1505
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
×
1506
    if (code != TSDB_CODE_SUCCESS) {
×
1507
      return code;
×
1508
    }
1509
  }
1510

1511
  for (int32_t i = 0; i < tblNum; ++i) {
×
1512
    SName* tableName = taosArrayGet(pRequest->tableList, i);
×
1513

1514
    // catalogRefreshTableMeta will handle tableName == null.
1515
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
×
1516
    if (code != TSDB_CODE_SUCCESS) {
×
1517
      return code;
×
1518
    }
1519
  }
1520

1521
  return code;
×
1522
}
1523

1524
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
28,628✔
1525
  SCatalog* pCatalog = NULL;
28,628✔
1526
  int32_t   tbNum = taosArrayGetSize(tbList);
28,628✔
1527
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
28,628✔
1528
  if (code != TSDB_CODE_SUCCESS) {
28,628!
1529
    return code;
×
1530
  }
1531

1532
  if (isView) {
28,628✔
1533
    for (int32_t i = 0; i < tbNum; ++i) {
648✔
1534
      SName* pViewName = taosArrayGet(tbList, i);
324✔
1535
      char   dbFName[TSDB_DB_FNAME_LEN];
1536
      if (NULL == pViewName) {
324!
1537
        continue;
×
1538
      }
1539
      (void)tNameGetFullDbName(pViewName, dbFName);
324✔
1540
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
324!
1541
    }
1542
  } else {
1543
    for (int32_t i = 0; i < tbNum; ++i) {
46,980✔
1544
      SName* pTbName = taosArrayGet(tbList, i);
18,676✔
1545
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
18,676!
1546
    }
1547
  }
1548

1549
  return TSDB_CODE_SUCCESS;
28,628✔
1550
}
1551

1552
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
13,180✔
1553
  pEpSet->version = 0;
13,180✔
1554

1555
  // init mnode ip set
1556
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
13,180✔
1557
  mgmtEpSet->numOfEps = 0;
13,180✔
1558
  mgmtEpSet->inUse = 0;
13,180✔
1559

1560
  if (firstEp && firstEp[0] != 0) {
13,180!
1561
    if (strlen(firstEp) >= TSDB_EP_LEN) {
13,185!
1562
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1563
      return -1;
×
1564
    }
1565

1566
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
13,185✔
1567
    if (code != TSDB_CODE_SUCCESS) {
13,183!
1568
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1569
      return terrno;
×
1570
    }
1571
    uint32_t addr = 0;
13,183✔
1572
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
13,183✔
1573
    if (code) {
13,185✔
1574
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
4!
1575
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1576
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
4✔
1577
    } else {
1578
      mgmtEpSet->numOfEps++;
13,181✔
1579
    }
1580
  }
1581

1582
  if (secondEp && secondEp[0] != 0) {
13,180!
1583
    if (strlen(secondEp) >= TSDB_EP_LEN) {
3,666!
1584
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1585
      return terrno;
×
1586
    }
1587

1588
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
3,666✔
1589
    if (code != TSDB_CODE_SUCCESS) {
3,666!
1590
      return code;
×
1591
    }
1592
    uint32_t addr = 0;
3,666✔
1593
    code = taosGetIpv4FromFqdn(mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
3,666✔
1594
    if (code) {
3,666!
1595
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1596
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1597
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1598
    } else {
1599
      mgmtEpSet->numOfEps++;
3,666✔
1600
    }
1601
  }
1602

1603
  if (mgmtEpSet->numOfEps == 0) {
13,180✔
1604
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1605
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
4✔
1606
  }
1607

1608
  return 0;
13,176✔
1609
}
1610

1611
int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
13,181✔
1612
                        SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1613
  *pTscObj = NULL;
13,181✔
1614
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
13,181✔
1615
  if (TSDB_CODE_SUCCESS != code) {
13,181!
1616
    return code;
×
1617
  }
1618

1619
  SRequestObj* pRequest = NULL;
13,181✔
1620
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
13,181✔
1621
  if (TSDB_CODE_SUCCESS != code) {
13,181!
1622
    destroyTscObj(*pTscObj);
×
1623
    return code;
×
1624
  }
1625

1626
  pRequest->sqlstr = taosStrdup("taos_connect");
13,181✔
1627
  if (pRequest->sqlstr) {
13,181!
1628
    pRequest->sqlLen = strlen(pRequest->sqlstr);
13,181✔
1629
  } else {
1630
    return terrno;
×
1631
  }
1632

1633
  SMsgSendInfo* body = NULL;
13,181✔
1634
  code = buildConnectMsg(pRequest, &body);
13,181✔
1635
  if (TSDB_CODE_SUCCESS != code) {
13,181!
1636
    destroyTscObj(*pTscObj);
×
1637
    return code;
×
1638
  }
1639

1640
  // int64_t transporterId = 0;
1641
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &(*pTscObj)->pAppInfo->mgmtEp.epSet, NULL, body);
13,181✔
1642
  if (TSDB_CODE_SUCCESS != code) {
13,181!
1643
    destroyTscObj(*pTscObj);
×
1644
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1645
    return code;
×
1646
  }
1647
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
13,181!
1648
    destroyTscObj(*pTscObj);
×
1649
    tscError("failed to wait sem, code:%s", terrstr());
×
1650
    return terrno;
×
1651
  }
1652
  if (pRequest->code != TSDB_CODE_SUCCESS) {
13,181✔
1653
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
144!
1654
    tscError("failed to connect to server, reason: %s", errorMsg);
144!
1655

1656
    terrno = pRequest->code;
144✔
1657
    destroyRequest(pRequest);
144✔
1658
    taos_close_internal(*pTscObj);
144✔
1659
    *pTscObj = NULL;
144✔
1660
    return terrno;
144✔
1661
  } else {
1662
    tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p,QID:0x%" PRIx64, (*pTscObj)->id,
13,037✔
1663
             (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1664
    destroyRequest(pRequest);
13,037✔
1665
  }
1666
  return code;
13,037✔
1667
}
1668

1669
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo) {
13,181✔
1670
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
13,181✔
1671
  if (*pMsgSendInfo == NULL) {
13,181!
1672
    return terrno;
×
1673
  }
1674

1675
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
13,181✔
1676

1677
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
13,181✔
1678
  (*pMsgSendInfo)->requestId = pRequest->requestId;
13,181✔
1679
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
13,181✔
1680
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
13,181✔
1681
  if (NULL == (*pMsgSendInfo)->param) {
13,181!
1682
    taosMemoryFree(*pMsgSendInfo);
×
1683
    return terrno;
×
1684
  }
1685

1686
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
13,181✔
1687

1688
  SConnectReq connectReq = {0};
13,181✔
1689
  STscObj*    pObj = pRequest->pTscObj;
13,181✔
1690

1691
  char* db = getDbOfConnection(pObj);
13,181✔
1692
  if (db != NULL) {
13,181✔
1693
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
31✔
1694
  } else if (terrno) {
13,150!
1695
    taosMemoryFree(*pMsgSendInfo);
×
1696
    return terrno;
×
1697
  }
1698
  taosMemoryFreeClear(db);
13,181✔
1699

1700
  connectReq.connType = pObj->connType;
13,181✔
1701
  connectReq.pid = appInfo.pid;
13,181✔
1702
  connectReq.startTime = appInfo.startTime;
13,181✔
1703

1704
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
13,181✔
1705
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
13,181✔
1706
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
13,181✔
1707
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
13,181✔
1708

1709
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
13,181✔
1710
  void*   pReq = taosMemoryMalloc(contLen);
13,180✔
1711
  if (NULL == pReq) {
13,181!
1712
    taosMemoryFree(*pMsgSendInfo);
×
1713
    return terrno;
×
1714
  }
1715

1716
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
13,181!
1717
    taosMemoryFree(*pMsgSendInfo);
×
1718
    taosMemoryFree(pReq);
×
1719
    return terrno;
×
1720
  }
1721

1722
  (*pMsgSendInfo)->msgInfo.len = contLen;
13,181✔
1723
  (*pMsgSendInfo)->msgInfo.pData = pReq;
13,181✔
1724
  return TSDB_CODE_SUCCESS;
13,181✔
1725
}
1726

1727
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
14,056,813✔
1728
  if (NULL == pEpSet) {
14,056,813✔
1729
    return;
12,593,868✔
1730
  }
1731

1732
  switch (pSendInfo->target.type) {
1,462,945✔
1733
    case TARGET_TYPE_MNODE:
30✔
1734
      if (NULL == pTscObj) {
30!
1735
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1736
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1737
        return;
×
1738
      }
1739

1740
      SEpSet* pOrig = &pTscObj->pAppInfo->mgmtEp.epSet;
30✔
1741
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
30✔
1742
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
30✔
1743
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
30!
1744
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1745
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
30✔
1746
      break;
30✔
1747
    case TARGET_TYPE_VNODE: {
850,551✔
1748
      if (NULL == pTscObj) {
850,551✔
1749
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
3!
1750
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1751
        return;
3✔
1752
      }
1753

1754
      SCatalog* pCatalog = NULL;
850,548✔
1755
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
850,548✔
1756
      if (code != TSDB_CODE_SUCCESS) {
850,549!
1757
        tscError("fail to get catalog handle, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
×
1758
                 tstrerror(code));
1759
        return;
×
1760
      }
1761

1762
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
850,549✔
1763
      if (code != TSDB_CODE_SUCCESS) {
850,548!
1764
        tscError("fail to update catalog vg epset, clusterId:%" PRIx64 ", error %s", pTscObj->pAppInfo->clusterId,
×
1765
                 tstrerror(code));
1766
        return;
×
1767
      }
1768
      taosMemoryFreeClear(pSendInfo->target.dbFName);
850,549!
1769
      break;
850,550✔
1770
    }
1771
    default:
612,364✔
1772
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
612,364!
1773
      break;
613,381✔
1774
  }
1775
}
1776

1777
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
14,065,578✔
1778
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
14,065,578✔
1779
  if (pMsg->info.ahandle == NULL) {
14,065,578✔
1780
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
284!
1781
    rpcFreeCont(pMsg->pCont);
284✔
1782
    taosMemoryFree(pEpSet);
284✔
1783
    return TSDB_CODE_TSC_INTERNAL_ERROR;
284✔
1784
  }
1785

1786
  STscObj* pTscObj = NULL;
14,065,294✔
1787

1788
  STraceId* trace = &pMsg->info.traceId;
14,065,294✔
1789
  char      tbuf[40] = {0};
14,065,294✔
1790
  TRACE_TO_STR(trace, tbuf);
14,065,294✔
1791

1792
  tscDebug("processMsgFromServer handle %p, message: %s, size:%d, code: %s,QID:%s", pMsg->info.handle,
14,060,586!
1793
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code), tbuf);
1794

1795
  if (pSendInfo->requestObjRefId != 0) {
14,060,680✔
1796
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
12,644,498✔
1797
    if (pRequest) {
12,643,095!
1798
      if (pRequest->self != pSendInfo->requestObjRefId) {
12,643,673!
1799
        tscError("doProcessMsgFromServer pRequest->self:%" PRId64 " != pSendInfo->requestObjRefId:%" PRId64,
×
1800
                 pRequest->self, pSendInfo->requestObjRefId);
1801

1802
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
1803
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1804
        }
1805
        rpcFreeCont(pMsg->pCont);
×
1806
        taosMemoryFree(pEpSet);
×
1807
        destroySendMsgInfo(pSendInfo);
×
1808
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1809
      }
1810
      pTscObj = pRequest->pTscObj;
12,643,673✔
1811
    }
1812
  }
1813

1814
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
14,059,277✔
1815

1816
  SDataBuf buf = {.msgType = pMsg->msgType,
14,056,733✔
1817
                  .len = pMsg->contLen,
14,056,733✔
1818
                  .pData = NULL,
1819
                  .handle = pMsg->info.handle,
14,056,733✔
1820
                  .handleRefId = pMsg->info.refId,
14,056,733✔
1821
                  .pEpSet = pEpSet};
1822

1823
  if (pMsg->code != TSDB_CODE_OUT_OF_MEMORY && pMsg->contLen > 0) {
14,056,733!
1824
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
13,950,920✔
1825
    if (buf.pData == NULL) {
13,952,701!
1826
      terrno = TSDB_CODE_OUT_OF_MEMORY;
×
1827
      pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
×
1828
    } else {
1829
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
13,953,303✔
1830
    }
1831
  }
1832

1833
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
14,059,116✔
1834

1835
  if (pTscObj) {
14,049,751✔
1836
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
12,634,178✔
1837
    if (TSDB_CODE_SUCCESS != code) {
12,641,540!
1838
      tscError("doProcessMsgFromServer taosReleaseRef failed");
×
1839
      terrno = code;
×
1840
      pMsg->code = code;
×
1841
    }
1842
  }
1843

1844
  rpcFreeCont(pMsg->pCont);
14,057,113✔
1845
  destroySendMsgInfo(pSendInfo);
14,063,634✔
1846
  return TSDB_CODE_SUCCESS;
14,064,140✔
1847
}
1848
int32_t doProcessMsgFromServer(void* param) {
14,066,094✔
1849
  AsyncArg* arg = (AsyncArg*)param;
14,066,094✔
1850
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
14,066,094✔
1851
  taosMemoryFree(arg);
14,061,796✔
1852
  return code;
14,065,490✔
1853
}
1854

1855
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
14,036,436✔
1856
  int32_t code = 0;
14,036,436✔
1857
  SEpSet* tEpSet = NULL;
14,036,436✔
1858
  if (pEpSet != NULL) {
14,036,436✔
1859
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
1,463,962✔
1860
    if (NULL == tEpSet) {
1,463,961!
1861
      code = terrno;
×
1862
      pMsg->code = terrno;
×
1863
      goto _exit;
×
1864
    }
1865
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
1,463,963✔
1866
  }
1867

1868
  // pMsg is response msg
1869
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
14,036,437✔
1870
    // restore origin code
1871
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
13,181!
1872
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
1873
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
13,181!
1874
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
1875
    }
1876
  } else {
1877
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
1878
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
14,023,256!
1879
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
1880
    }
1881
  }
1882

1883
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
14,036,437✔
1884
  if (NULL == arg) {
14,041,425!
1885
    code = terrno;
×
1886
    pMsg->code = code;
×
1887
    goto _exit;
×
1888
  }
1889

1890
  arg->msg = *pMsg;
14,041,425✔
1891
  arg->pEpset = tEpSet;
14,041,425✔
1892

1893
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
14,041,425!
1894
    pMsg->code = code;
×
1895
    taosMemoryFree(arg);
×
1896
    goto _exit;
×
1897
  }
1898
  return;
14,060,815✔
1899
_exit:
×
1900
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
×
1901
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
×
1902
  if (code != 0) {
×
1903
    tscError("failed to sched msg to tsc, tsc ready quit");
×
1904
  }
1905
}
1906

1907
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
3✔
1908
  tscDebug("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
3✔
1909
  if (user == NULL) {
3!
1910
    user = TSDB_DEFAULT_USER;
×
1911
  }
1912

1913
  if (auth == NULL) {
3!
1914
    tscError("No auth info is given, failed to connect to server");
×
1915
    return NULL;
×
1916
  }
1917

1918
  STscObj* pObj = NULL;
3✔
1919
  int32_t  code = taos_connect_internal(ip, user, NULL, auth, db, port, CONN_TYPE__QUERY, &pObj);
3✔
1920
  if (TSDB_CODE_SUCCESS == code) {
3✔
1921
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1✔
1922
    if (NULL == rid) {
1!
1923
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
1924
    }
1925
    *rid = pObj->id;
1✔
1926
    return (TAOS*)rid;
1✔
1927
  }
1928

1929
  return NULL;
2✔
1930
}
1931

1932
// TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, const char* pass, int passLen,
1933
//                      const char* db, int dbLen, uint16_t port) {
1934
//   char ipStr[TSDB_EP_LEN] = {0};
1935
//   char dbStr[TSDB_DB_NAME_LEN] = {0};
1936
//   char userStr[TSDB_USER_LEN] = {0};
1937
//   char passStr[TSDB_PASSWORD_LEN] = {0};
1938
//
1939
//   tstrncpy(ipStr, ip, TMIN(TSDB_EP_LEN - 1, ipLen));
1940
//   tstrncpy(userStr, user, TMIN(TSDB_USER_LEN - 1, userLen));
1941
//   tstrncpy(passStr, pass, TMIN(TSDB_PASSWORD_LEN - 1, passLen));
1942
//   tstrncpy(dbStr, db, TMIN(TSDB_DB_NAME_LEN - 1, dbLen));
1943
//   return taos_connect(ipStr, userStr, passStr, dbStr, port);
1944
// }
1945

1946
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
20,541,312✔
1947
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
115,420,557✔
1948
    SResultColumn* pCol = &pResultInfo->pCol[i];
94,879,245✔
1949

1950
    int32_t type = pResultInfo->fields[i].type;
94,879,245✔
1951
    int32_t bytes = pResultInfo->fields[i].bytes;
94,879,245✔
1952

1953
    if (IS_VAR_DATA_TYPE(type)) {
94,879,245✔
1954
      if (!IS_VAR_NULL_TYPE(type, bytes) && pCol->offset[pResultInfo->current] != -1) {
21,193,748✔
1955
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
20,545,025✔
1956

1957
        pResultInfo->length[i] = varDataLen(pStart);
20,545,025✔
1958
        pResultInfo->row[i] = varDataVal(pStart);
20,545,025✔
1959
      } else {
1960
        pResultInfo->row[i] = NULL;
648,723✔
1961
        pResultInfo->length[i] = 0;
648,723✔
1962
      }
1963
    } else {
1964
      if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
73,685,497✔
1965
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
71,198,601✔
1966
        pResultInfo->length[i] = bytes;
71,198,601✔
1967
      } else {
1968
        pResultInfo->row[i] = NULL;
2,486,896✔
1969
        pResultInfo->length[i] = 0;
2,486,896✔
1970
      }
1971
    }
1972
  }
1973
}
20,541,312✔
1974

1975
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
1976
  if (pRequest == NULL) {
×
1977
    return NULL;
×
1978
  }
1979

1980
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
1981
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
1982
    // All data has returned to App already, no need to try again
1983
    if (pResultInfo->completed) {
×
1984
      pResultInfo->numOfRows = 0;
×
1985
      return NULL;
×
1986
    }
1987

1988
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
1989
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
1990

1991
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
1992
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
1993
      pResultInfo->numOfRows = 0;
×
1994
      return NULL;
×
1995
    }
1996

1997
    pRequest->code =
×
1998
        setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData, convertUcs4);
×
1999
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2000
      pResultInfo->numOfRows = 0;
×
2001
      return NULL;
×
2002
    }
2003

2004
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
×
2005
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2006

2007
    STscObj*            pTscObj = pRequest->pTscObj;
×
2008
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2009
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2010

2011
    if (pResultInfo->numOfRows == 0) {
×
2012
      return NULL;
×
2013
    }
2014
  }
2015

2016
  if (setupOneRowPtr) {
×
2017
    doSetOneRowPtr(pResultInfo);
×
2018
    pResultInfo->current += 1;
×
2019
  }
2020

2021
  return pResultInfo->row;
×
2022
}
2023

2024
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
913,324✔
2025
  tsem_t* sem = param;
913,324✔
2026
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
913,324!
2027
    tscError("failed to post sem, code:%s", terrstr());
×
2028
  }
2029
}
913,326✔
2030

2031
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
7,225,272✔
2032
  if (pRequest == NULL) {
7,225,272!
2033
    return NULL;
×
2034
  }
2035

2036
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
7,225,272✔
2037
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
7,225,272✔
2038
    // All data has returned to App already, no need to try again
2039
    if (pResultInfo->completed) {
1,583,234✔
2040
      pResultInfo->numOfRows = 0;
669,908✔
2041
      return NULL;
669,908✔
2042
    }
2043

2044
    // convert ucs4 to native multi-bytes string
2045
    pResultInfo->convertUcs4 = convertUcs4;
913,326✔
2046
    tsem_t sem;
2047
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
913,326!
2048
      tscError("failed to init sem, code:%s", terrstr());
×
2049
    }
2050
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
913,324✔
2051
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
913,325!
2052
      tscError("failed to wait sem, code:%s", terrstr());
×
2053
    }
2054
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
913,325!
2055
      tscError("failed to destroy sem, code:%s", terrstr());
×
2056
    }
2057
    pRequest->inCallback = false;
913,322✔
2058
  }
2059

2060
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
6,555,360✔
2061
    return NULL;
64,974✔
2062
  } else {
2063
    if (setupOneRowPtr) {
6,490,386✔
2064
      doSetOneRowPtr(pResultInfo);
5,670,474✔
2065
      pResultInfo->current += 1;
5,670,467✔
2066
    }
2067

2068
    return pResultInfo->row;
6,490,379✔
2069
  }
2070
}
2071

2072
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
1,377,910✔
2073
  if (pResInfo->row == NULL) {
1,377,910✔
2074
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,199,816✔
2075
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
1,199,838✔
2076
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
1,199,837✔
2077
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
1,199,839✔
2078

2079
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
1,199,836!
2080
      taosMemoryFree(pResInfo->row);
×
2081
      taosMemoryFree(pResInfo->pCol);
×
2082
      taosMemoryFree(pResInfo->length);
×
2083
      taosMemoryFree(pResInfo->convertBuf);
×
2084
      return terrno;
×
2085
    }
2086
  }
2087

2088
  return TSDB_CODE_SUCCESS;
1,377,938✔
2089
}
2090

2091
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength) {
1,377,884✔
2092
  int32_t idx = -1;
1,377,884✔
2093
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
1,377,884✔
2094
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
1,377,894!
2095

2096
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,538,932✔
2097
    int32_t type = pResultInfo->fields[i].type;
5,161,026✔
2098
    int32_t bytes = pResultInfo->fields[i].bytes;
5,161,026✔
2099

2100
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
5,161,026✔
2101
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
239,154✔
2102
      if (p == NULL) {
239,154!
2103
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2104
        return terrno;
×
2105
      }
2106

2107
      pResultInfo->convertBuf[i] = p;
239,154✔
2108

2109
      SResultColumn* pCol = &pResultInfo->pCol[i];
239,154✔
2110
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
27,303,789✔
2111
        if (pCol->offset[j] != -1) {
27,064,623✔
2112
          char* pStart = pCol->offset[j] + pCol->pData;
24,121,121✔
2113

2114
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
24,121,121✔
2115
          if (len < 0 || len > bytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
24,121,125!
2116
            tscError(
×
2117
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2118
                "colLength[i]):%p",
2119
                len, bytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2120
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2121
            return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2122
          }
2123

2124
          varDataSetLen(p, len);
24,121,133✔
2125
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
24,121,133✔
2126
          p += (len + VARSTR_HEADER_SIZE);
24,121,133✔
2127
        }
2128
      }
2129

2130
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
239,166✔
2131
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
239,166✔
2132
    }
2133
  }
2134
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
1,377,906✔
2135
  return TSDB_CODE_SUCCESS;
1,377,908✔
2136
}
2137

2138
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
1,224✔
2139
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
1,224✔
2140
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
2141
}
2142

2143
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
612✔
2144
  char*   p = (char*)pResultInfo->pData;
612✔
2145
  int32_t blockVersion = *(int32_t*)p;
612✔
2146

2147
  int32_t numOfRows = pResultInfo->numOfRows;
612✔
2148
  int32_t numOfCols = pResultInfo->numOfCols;
612✔
2149

2150
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2151
  // length |
2152
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
612✔
2153
  if (numOfCols != cols) {
612!
2154
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2155
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2156
  }
2157

2158
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
612✔
2159
  int32_t* colLength = (int32_t*)(p + len);
612✔
2160
  len += sizeof(int32_t) * numOfCols;
612✔
2161

2162
  char* pStart = p + len;
612✔
2163
  for (int32_t i = 0; i < numOfCols; ++i) {
2,776✔
2164
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,164!
2165

2166
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,164✔
2167
      int32_t* offset = (int32_t*)pStart;
644✔
2168
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
644✔
2169
      len += lenTmp;
644✔
2170
      pStart += lenTmp;
644✔
2171

2172
      int32_t estimateColLen = 0;
644✔
2173
      for (int32_t j = 0; j < numOfRows; ++j) {
2,552✔
2174
        if (offset[j] == -1) {
1,908✔
2175
          continue;
212✔
2176
        }
2177
        char* data = offset[j] + pStart;
1,696✔
2178

2179
        int32_t jsonInnerType = *data;
1,696✔
2180
        char*   jsonInnerData = data + CHAR_BYTES;
1,696✔
2181
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1,696✔
2182
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
72✔
2183
        } else if (tTagIsJson(data)) {
1,624✔
2184
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
836✔
2185
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
788✔
2186
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
492✔
2187
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2188
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
216✔
2189
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2190
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
80✔
2191
        } else {
2192
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2193
          return -1;
×
2194
        }
2195
      }
2196
      len += TMAX(colLen, estimateColLen);
644✔
2197
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,520!
2198
      int32_t lenTmp = numOfRows * sizeof(int32_t);
396✔
2199
      len += (lenTmp + colLen);
396✔
2200
      pStart += lenTmp;
396✔
2201
    } else {
2202
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
1,124✔
2203
      len += (lenTmp + colLen);
1,124✔
2204
      pStart += lenTmp;
1,124✔
2205
    }
2206
    pStart += colLen;
2,164✔
2207
  }
2208

2209
  // Ensure the complete structure of the block, including the blankfill field,
2210
  // even though it is not used on the client side.
2211
  len += sizeof(bool);
612✔
2212
  return len;
612✔
2213
}
2214

2215
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
1,377,928✔
2216
  int32_t numOfRows = pResultInfo->numOfRows;
1,377,928✔
2217
  int32_t numOfCols = pResultInfo->numOfCols;
1,377,928✔
2218
  bool    needConvert = false;
1,377,928✔
2219
  for (int32_t i = 0; i < numOfCols; ++i) {
6,538,447✔
2220
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
5,161,131✔
2221
      needConvert = true;
612✔
2222
      break;
612✔
2223
    }
2224
  }
2225

2226
  if (!needConvert) {
1,377,928✔
2227
    return TSDB_CODE_SUCCESS;
1,377,319✔
2228
  }
2229

2230
  tscDebug("start to convert form json format string");
609!
2231

2232
  char*   p = (char*)pResultInfo->pData;
609✔
2233
  int32_t blockVersion = *(int32_t*)p;
609✔
2234
  int32_t dataLen = estimateJsonLen(pResultInfo);
609✔
2235
  if (dataLen <= 0) {
612!
2236
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2237
  }
2238

2239
  taosMemoryFreeClear(pResultInfo->convertJson);
612✔
2240
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
612✔
2241
  if (pResultInfo->convertJson == NULL) return terrno;
612!
2242
  char* p1 = pResultInfo->convertJson;
612✔
2243

2244
  int32_t totalLen = 0;
612✔
2245
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
612✔
2246
  if (numOfCols != cols) {
612!
2247
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2248
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2249
  }
2250

2251
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
612✔
2252
  (void)memcpy(p1, p, len);
612✔
2253

2254
  p += len;
612✔
2255
  p1 += len;
612✔
2256
  totalLen += len;
612✔
2257

2258
  len = sizeof(int32_t) * numOfCols;
612✔
2259
  int32_t* colLength = (int32_t*)p;
612✔
2260
  int32_t* colLength1 = (int32_t*)p1;
612✔
2261
  (void)memcpy(p1, p, len);
612✔
2262
  p += len;
612✔
2263
  p1 += len;
612✔
2264
  totalLen += len;
612✔
2265

2266
  char* pStart = p;
612✔
2267
  char* pStart1 = p1;
612✔
2268
  for (int32_t i = 0; i < numOfCols; ++i) {
2,776✔
2269
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
2,164!
2270
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
2,164!
2271
    if (colLen >= dataLen) {
2,164!
2272
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2273
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2274
    }
2275
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
2,164✔
2276
      int32_t* offset = (int32_t*)pStart;
644✔
2277
      int32_t* offset1 = (int32_t*)pStart1;
644✔
2278
      len = numOfRows * sizeof(int32_t);
644✔
2279
      (void)memcpy(pStart1, pStart, len);
644✔
2280
      pStart += len;
644✔
2281
      pStart1 += len;
644✔
2282
      totalLen += len;
644✔
2283

2284
      len = 0;
644✔
2285
      for (int32_t j = 0; j < numOfRows; ++j) {
2,552✔
2286
        if (offset[j] == -1) {
1,908✔
2287
          continue;
212✔
2288
        }
2289
        char* data = offset[j] + pStart;
1,696✔
2290

2291
        int32_t jsonInnerType = *data;
1,696✔
2292
        char*   jsonInnerData = data + CHAR_BYTES;
1,696✔
2293
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
1,696✔
2294
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
1,696✔
2295
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
72✔
2296
          varDataSetLen(dst, strlen(varDataVal(dst)));
72✔
2297
        } else if (tTagIsJson(data)) {
1,624✔
2298
          char* jsonString = NULL;
836✔
2299
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
836✔
2300
          if (jsonString == NULL) {
836!
2301
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2302
            return terrno;
×
2303
          }
2304
          STR_TO_VARSTR(dst, jsonString);
836✔
2305
          taosMemoryFree(jsonString);
836✔
2306
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
788✔
2307
          *(char*)varDataVal(dst) = '\"';
492✔
2308
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData),
492✔
2309
                                         varDataVal(dst) + CHAR_BYTES, pResultInfo->charsetCxt);
2310
          if (length <= 0) {
492✔
2311
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
4!
2312
              pResultInfo->charsetCxt != NULL ? ((SConvInfo *)(pResultInfo->charsetCxt))->charset : tsCharset);
2313
            length = 0;
4✔
2314
          }
2315
          varDataSetLen(dst, length + CHAR_BYTES * 2);
492✔
2316
          *(char*)POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES) = '\"';
492✔
2317
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
296✔
2318
          double jsonVd = *(double*)(jsonInnerData);
216✔
2319
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
216✔
2320
          varDataSetLen(dst, strlen(varDataVal(dst)));
216✔
2321
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
80!
2322
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
80✔
2323
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
80✔
2324
          varDataSetLen(dst, strlen(varDataVal(dst)));
80✔
2325
        } else {
2326
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2327
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2328
        }
2329

2330
        offset1[j] = len;
1,696✔
2331
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
1,696✔
2332
        len += varDataTLen(dst);
1,696✔
2333
      }
2334
      colLen1 = len;
644✔
2335
      totalLen += colLen1;
644✔
2336
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
644!
2337
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,520!
2338
      len = numOfRows * sizeof(int32_t);
396✔
2339
      (void)memcpy(pStart1, pStart, len);
396✔
2340
      pStart += len;
396✔
2341
      pStart1 += len;
396✔
2342
      totalLen += len;
396✔
2343
      totalLen += colLen;
396✔
2344
      (void)memcpy(pStart1, pStart, colLen);
396✔
2345
    } else {
2346
      len = BitmapLen(pResultInfo->numOfRows);
1,124✔
2347
      (void)memcpy(pStart1, pStart, len);
1,124✔
2348
      pStart += len;
1,124✔
2349
      pStart1 += len;
1,124✔
2350
      totalLen += len;
1,124✔
2351
      totalLen += colLen;
1,124✔
2352
      (void)memcpy(pStart1, pStart, colLen);
1,124✔
2353
    }
2354
    pStart += colLen;
2,164✔
2355
    pStart1 += colLen1;
2,164✔
2356
  }
2357

2358
  // Ensure the complete structure of the block, including the blankfill field,
2359
  // even though it is not used on the client side.
2360
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2361
  totalLen += sizeof(bool);
612✔
2362

2363
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
612✔
2364
  pResultInfo->pData = pResultInfo->convertJson;
612✔
2365
  return TSDB_CODE_SUCCESS;
612✔
2366
}
2367

2368
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4) {
1,448,003✔
2369
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
1,448,003!
2370
    tscError("setResultDataPtr paras error");
×
2371
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2372
  }
2373

2374
  if (pResultInfo->numOfRows == 0) {
1,448,014✔
2375
    return TSDB_CODE_SUCCESS;
70,094✔
2376
  }
2377

2378
  if (pResultInfo->pData == NULL) {
1,377,920!
2379
    tscError("setResultDataPtr error: pData is NULL");
×
2380
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2381
  }
2382

2383
  int32_t code = doPrepareResPtr(pResultInfo);
1,377,920✔
2384
  if (code != TSDB_CODE_SUCCESS) {
1,377,938!
2385
    return code;
×
2386
  }
2387
  code = doConvertJson(pResultInfo);
1,377,938✔
2388
  if (code != TSDB_CODE_SUCCESS) {
1,377,920!
2389
    return code;
×
2390
  }
2391

2392
  char* p = (char*)pResultInfo->pData;
1,377,920✔
2393

2394
  // version:
2395
  int32_t blockVersion = *(int32_t*)p;
1,377,920✔
2396
  p += sizeof(int32_t);
1,377,920✔
2397

2398
  int32_t dataLen = *(int32_t*)p;
1,377,920✔
2399
  p += sizeof(int32_t);
1,377,920✔
2400

2401
  int32_t rows = *(int32_t*)p;
1,377,920✔
2402
  p += sizeof(int32_t);
1,377,920✔
2403

2404
  int32_t cols = *(int32_t*)p;
1,377,920✔
2405
  p += sizeof(int32_t);
1,377,920✔
2406

2407
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
1,377,920!
2408
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
×
2409
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2410
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2411
  }
2412

2413
  int32_t hasColumnSeg = *(int32_t*)p;
1,377,928✔
2414
  p += sizeof(int32_t);
1,377,928✔
2415

2416
  uint64_t groupId = *(uint64_t*)p;
1,377,928✔
2417
  p += sizeof(uint64_t);
1,377,928✔
2418

2419
  // check fields
2420
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,539,108✔
2421
    int8_t type = *(int8_t*)p;
5,161,180✔
2422
    p += sizeof(int8_t);
5,161,180✔
2423

2424
    int32_t bytes = *(int32_t*)p;
5,161,180✔
2425
    p += sizeof(int32_t);
5,161,180✔
2426
  }
2427

2428
  int32_t* colLength = (int32_t*)p;
1,377,928✔
2429
  p += sizeof(int32_t) * pResultInfo->numOfCols;
1,377,928✔
2430

2431
  char* pStart = p;
1,377,928✔
2432
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
6,539,009✔
2433
    if ((pStart - pResultInfo->pData) >= dataLen) {
5,161,088!
2434
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2435
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2436
    }
2437
    if (blockVersion == BLOCK_VERSION_1) {
5,161,088✔
2438
      colLength[i] = htonl(colLength[i]);
3,251,889✔
2439
    }
2440
    if (colLength[i] >= dataLen) {
5,161,088!
2441
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2442
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2443
    }
2444
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
5,161,088!
2445
      tscError("invalid type %d", pResultInfo->fields[i].type);
7!
2446
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2447
    }
2448
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
5,161,081✔
2449
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
1,279,250✔
2450
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
1,279,250✔
2451
    } else {
2452
      pResultInfo->pCol[i].nullbitmap = pStart;
3,881,831✔
2453
      pStart += BitmapLen(pResultInfo->numOfRows);
3,881,831✔
2454
    }
2455

2456
    pResultInfo->pCol[i].pData = pStart;
5,161,081✔
2457
    pResultInfo->length[i] = pResultInfo->fields[i].bytes;
5,161,081✔
2458
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
5,161,081✔
2459

2460
    pStart += colLength[i];
5,161,081✔
2461
  }
2462

2463
  p = pStart;
1,377,921✔
2464
  // bool blankFill = *(bool*)p;
2465
  p += sizeof(bool);
1,377,921✔
2466
  int32_t offset = p - pResultInfo->pData;
1,377,921✔
2467
  if (offset > dataLen) {
1,377,921!
2468
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2469
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2470
  }
2471

2472
  if (convertUcs4) {
1,377,921✔
2473
    code = doConvertUCS4(pResultInfo, colLength);
1,377,892✔
2474
  }
2475

2476
  return code;
1,377,941✔
2477
}
2478

2479
char* getDbOfConnection(STscObj* pObj) {
10,962,556✔
2480
  terrno = TSDB_CODE_SUCCESS;
10,962,556✔
2481
  char* p = NULL;
10,960,691✔
2482
  (void)taosThreadMutexLock(&pObj->mutex);
10,960,691✔
2483
  size_t len = strlen(pObj->db);
10,969,405✔
2484
  if (len > 0) {
10,969,405✔
2485
    p = taosStrndup(pObj->db, tListLen(pObj->db));
10,640,464✔
2486
    if (p == NULL) {
10,635,794!
2487
      tscError("failed to taosStrndup db name");
×
2488
    }
2489
  }
2490

2491
  (void)taosThreadMutexUnlock(&pObj->mutex);
10,964,735✔
2492
  return p;
10,970,076✔
2493
}
2494

2495
void setConnectionDB(STscObj* pTscObj, const char* db) {
9,891✔
2496
  if (db == NULL || pTscObj == NULL) {
9,891!
2497
    tscError("setConnectionDB para is NULL");
×
2498
    return;
×
2499
  }
2500

2501
  (void)taosThreadMutexLock(&pTscObj->mutex);
9,892✔
2502
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
9,893✔
2503
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
9,893✔
2504
}
2505

2506
void resetConnectDB(STscObj* pTscObj) {
×
2507
  if (pTscObj == NULL) {
×
2508
    return;
×
2509
  }
2510

2511
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2512
  pTscObj->db[0] = 0;
×
2513
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2514
}
2515

2516
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
1,091,701✔
2517
  if (pResultInfo == NULL || pRsp == NULL) {
1,091,701!
2518
    tscError("setQueryResultFromRsp paras is null");
×
2519
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2520
  }
2521

2522
  taosMemoryFreeClear(pResultInfo->pRspMsg);
1,091,709✔
2523
  pResultInfo->pRspMsg = (const char*)pRsp;
1,091,709✔
2524
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
1,091,709✔
2525
  pResultInfo->current = 0;
1,091,707✔
2526
  pResultInfo->completed = (pRsp->completed == 1);
1,091,707✔
2527
  pResultInfo->precision = pRsp->precision;
1,091,707✔
2528

2529
  // decompress data if needed
2530
  int32_t payloadLen = htonl(pRsp->payloadLen);
1,091,707✔
2531

2532
  if (pRsp->compressed) {
1,091,707✔
2533
    if (pResultInfo->decompBuf == NULL) {
647✔
2534
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
14✔
2535
      if (pResultInfo->decompBuf == NULL) {
14!
2536
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2537
        return terrno;
×
2538
      }
2539
      pResultInfo->decompBufSize = payloadLen;
14✔
2540
    } else {
2541
      if (pResultInfo->decompBufSize < payloadLen) {
633✔
2542
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
22✔
2543
        if (p == NULL) {
22!
2544
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2545
          return terrno;
×
2546
        }
2547

2548
        pResultInfo->decompBuf = p;
22✔
2549
        pResultInfo->decompBufSize = payloadLen;
22✔
2550
      }
2551
    }
2552
  }
2553

2554
  if (payloadLen > 0) {
1,091,707✔
2555
    int32_t compLen = *(int32_t*)pRsp->data;
1,021,607✔
2556
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
1,021,607✔
2557

2558
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
1,021,607✔
2559

2560
    if (pRsp->compressed && compLen < rawLen) {
1,021,607!
2561
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
647✔
2562
      if (len < 0) {
647!
2563
        tscError("tsDecompressString failed");
×
2564
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2565
      }
2566
      if (len != rawLen) {
647!
2567
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2568
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2569
      }
2570
      pResultInfo->pData = pResultInfo->decompBuf;
647✔
2571
      pResultInfo->payloadLen = rawLen;
647✔
2572
    } else {
2573
      pResultInfo->pData = pStart;
1,020,960✔
2574
      pResultInfo->payloadLen = htonl(pRsp->compLen);
1,020,960✔
2575
      if (pRsp->compLen != pRsp->payloadLen) {
1,020,960!
2576
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2577
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2578
      }
2579
    }
2580
  }
2581

2582
  // TODO handle the compressed case
2583
  pResultInfo->totalRows += pResultInfo->numOfRows;
1,091,707✔
2584

2585
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4);
1,091,707✔
2586
  return code;
1,091,720✔
2587
}
2588

2589
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
4✔
2590
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
4✔
2591
  void*              clientRpc = NULL;
4✔
2592
  SServerStatusRsp   statusRsp = {0};
4✔
2593
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
4✔
2594
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
4✔
2595
  SRpcMsg  rpcRsp = {0};
4✔
2596
  SRpcInit rpcInit = {0};
4✔
2597
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
4✔
2598

2599
  rpcInit.label = "CHK";
4✔
2600
  rpcInit.numOfThreads = 1;
4✔
2601
  rpcInit.cfp = NULL;
4✔
2602
  rpcInit.sessions = 16;
4✔
2603
  rpcInit.connType = TAOS_CONN_CLIENT;
4✔
2604
  rpcInit.idleTime = tsShellActivityTimer * 1000;
4✔
2605
  rpcInit.compressSize = tsCompressMsgSize;
4✔
2606
  rpcInit.user = "_dnd";
4✔
2607

2608
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
4✔
2609
  connLimitNum = TMAX(connLimitNum, 10);
4✔
2610
  connLimitNum = TMIN(connLimitNum, 500);
4✔
2611
  rpcInit.connLimitNum = connLimitNum;
4✔
2612
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
4✔
2613
  rpcInit.readTimeout = tsReadTimeout;
4✔
2614
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
4!
2615
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2616
    goto _OVER;
×
2617
  }
2618

2619
  clientRpc = rpcOpen(&rpcInit);
4✔
2620
  if (clientRpc == NULL) {
4!
2621
    code = terrno;
×
2622
    tscError("failed to init server status client since %s", tstrerror(code));
×
2623
    goto _OVER;
×
2624
  }
2625

2626
  if (fqdn == NULL) {
4!
2627
    fqdn = tsLocalFqdn;
4✔
2628
  }
2629

2630
  if (port == 0) {
4!
2631
    port = tsServerPort;
4✔
2632
  }
2633

2634
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
4✔
2635
  epSet.eps[0].port = (uint16_t)port;
4✔
2636
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
4✔
2637
  if (TSDB_CODE_SUCCESS != ret) {
4!
2638
    tscError("failed to send recv since %s", tstrerror(ret));
×
2639
    goto _OVER;
×
2640
  }
2641

2642
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
4!
2643
    tscError("failed to send server status req since %s", terrstr());
1!
2644
    goto _OVER;
1✔
2645
  }
2646

2647
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
3!
2648
    tscError("failed to parse server status rsp since %s", terrstr());
×
2649
    goto _OVER;
×
2650
  }
2651

2652
  code = statusRsp.statusCode;
3✔
2653
  if (details != NULL) {
3!
2654
    tstrncpy(details, statusRsp.details, maxlen);
3✔
2655
  }
2656

2657
_OVER:
×
2658
  if (clientRpc != NULL) {
4!
2659
    rpcClose(clientRpc);
4✔
2660
  }
2661
  if (rpcRsp.pCont != NULL) {
4✔
2662
    rpcFreeCont(rpcRsp.pCont);
3✔
2663
  }
2664
  return code;
4✔
2665
}
2666

2667
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
3✔
2668
                      int32_t acctId, char* db) {
2669
  SName name = {0};
3✔
2670

2671
  if (len1 <= 0) {
3!
2672
    return -1;
×
2673
  }
2674

2675
  const char* dbName = db;
3✔
2676
  const char* tbName = NULL;
3✔
2677
  int32_t     dbLen = 0;
3✔
2678
  int32_t     tbLen = 0;
3✔
2679
  if (len2 > 0) {
3!
2680
    dbName = str + pos1;
×
2681
    dbLen = len1;
×
2682
    tbName = str + pos2;
×
2683
    tbLen = len2;
×
2684
  } else {
2685
    dbLen = strlen(db);
3✔
2686
    tbName = str + pos1;
3✔
2687
    tbLen = len1;
3✔
2688
  }
2689

2690
  if (dbLen <= 0 || tbLen <= 0) {
3!
2691
    return -1;
×
2692
  }
2693

2694
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
3!
2695
    return -1;
×
2696
  }
2697

2698
  if (tNameAddTbName(&name, tbName, tbLen)) {
3!
2699
    return -1;
×
2700
  }
2701

2702
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
3✔
2703
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
3✔
2704

2705
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
3✔
2706
  if (pDb) {
3!
2707
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
2708
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
2709
    }
2710
  } else {
2711
    STablesReq db;
2712
    db.pTables = taosArrayInit(20, sizeof(SName));
3✔
2713
    if (NULL == db.pTables) {
3!
2714
      return terrno;
×
2715
    }
2716
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
3✔
2717
    if (NULL == taosArrayPush(db.pTables, &name)) {
6!
2718
      return terrno;
×
2719
    }
2720
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
3!
2721
  }
2722

2723
  return TSDB_CODE_SUCCESS;
3✔
2724
}
2725

2726
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
3✔
2727
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
3✔
2728
  if (NULL == pHash) {
3!
2729
    return terrno;
×
2730
  }
2731

2732
  bool    inEscape = false;
3✔
2733
  int32_t code = 0;
3✔
2734
  void*   pIter = NULL;
3✔
2735

2736
  int32_t vIdx = 0;
3✔
2737
  int32_t vPos[2];
2738
  int32_t vLen[2];
2739

2740
  (void)memset(vPos, -1, sizeof(vPos));
3✔
2741
  (void)memset(vLen, 0, sizeof(vLen));
3✔
2742

2743
  for (int32_t i = 0;; ++i) {
12✔
2744
    if (0 == *(tbList + i)) {
12✔
2745
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
3!
2746
        vLen[vIdx] = i - vPos[vIdx];
3✔
2747
      }
2748

2749
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
3✔
2750
      if (code) {
3!
2751
        goto _return;
×
2752
      }
2753

2754
      break;
3✔
2755
    }
2756

2757
    if ('`' == *(tbList + i)) {
9!
2758
      inEscape = !inEscape;
×
2759
      if (!inEscape) {
×
2760
        if (vPos[vIdx] >= 0) {
×
2761
          vLen[vIdx] = i - vPos[vIdx];
×
2762
        } else {
2763
          goto _return;
×
2764
        }
2765
      }
2766

2767
      continue;
×
2768
    }
2769

2770
    if (inEscape) {
9!
2771
      if (vPos[vIdx] < 0) {
×
2772
        vPos[vIdx] = i;
×
2773
      }
2774
      continue;
×
2775
    }
2776

2777
    if ('.' == *(tbList + i)) {
9!
2778
      if (vPos[vIdx] < 0) {
×
2779
        goto _return;
×
2780
      }
2781
      if (vLen[vIdx] <= 0) {
×
2782
        vLen[vIdx] = i - vPos[vIdx];
×
2783
      }
2784
      vIdx++;
×
2785
      if (vIdx >= 2) {
×
2786
        goto _return;
×
2787
      }
2788
      continue;
×
2789
    }
2790

2791
    if (',' == *(tbList + i)) {
9!
2792
      if (vPos[vIdx] < 0) {
×
2793
        goto _return;
×
2794
      }
2795
      if (vLen[vIdx] <= 0) {
×
2796
        vLen[vIdx] = i - vPos[vIdx];
×
2797
      }
2798

2799
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
2800
      if (code) {
×
2801
        goto _return;
×
2802
      }
2803

2804
      (void)memset(vPos, -1, sizeof(vPos));
×
2805
      (void)memset(vLen, 0, sizeof(vLen));
×
2806
      vIdx = 0;
×
2807
      continue;
×
2808
    }
2809

2810
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
9!
2811
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
2812
        vLen[vIdx] = i - vPos[vIdx];
×
2813
      }
2814
      continue;
×
2815
    }
2816

2817
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
9!
2818
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
×
2819
      if (vLen[vIdx] > 0) {
9!
2820
        goto _return;
×
2821
      }
2822
      if (vPos[vIdx] < 0) {
9✔
2823
        vPos[vIdx] = i;
3✔
2824
      }
2825
      continue;
9✔
2826
    }
2827

2828
    goto _return;
×
2829
  }
2830

2831
  int32_t dbNum = taosHashGetSize(pHash);
3✔
2832
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
3✔
2833
  if (NULL == pReq) {
3!
2834
    TSC_ERR_JRET(terrno);
×
2835
  }
2836
  pIter = taosHashIterate(pHash, NULL);
3✔
2837
  while (pIter) {
6✔
2838
    STablesReq* pDb = (STablesReq*)pIter;
3✔
2839
    if (NULL == taosArrayPush(*pReq, pDb)) {
6!
2840
      TSC_ERR_JRET(terrno);
×
2841
    }
2842
    pIter = taosHashIterate(pHash, pIter);
3✔
2843
  }
2844

2845
  taosHashCleanup(pHash);
3✔
2846

2847
  return TSDB_CODE_SUCCESS;
3✔
2848

2849
_return:
×
2850

2851
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
2852

2853
  pIter = taosHashIterate(pHash, NULL);
×
2854
  while (pIter) {
×
2855
    STablesReq* pDb = (STablesReq*)pIter;
×
2856
    taosArrayDestroy(pDb->pTables);
×
2857
    pIter = taosHashIterate(pHash, pIter);
×
2858
  }
2859

2860
  taosHashCleanup(pHash);
×
2861

2862
  return terrno;
×
2863
}
2864

2865
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
3✔
2866
  SSyncQueryParam* pParam = param;
3✔
2867
  pParam->pRequest->code = code;
3✔
2868

2869
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
3!
2870
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2871
  }
2872
}
3✔
2873

2874
void syncQueryFn(void* param, void* res, int32_t code) {
10,930,147✔
2875
  SSyncQueryParam* pParam = param;
10,930,147✔
2876
  pParam->pRequest = res;
10,930,147✔
2877

2878
  if (pParam->pRequest) {
10,930,147✔
2879
    pParam->pRequest->code = code;
10,928,211✔
2880
    clientOperateReport(pParam->pRequest);
10,928,211✔
2881
  }
2882

2883
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
10,931,477!
2884
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
2885
  }
2886
}
10,939,276✔
2887

2888
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
10,928,095✔
2889
                        int8_t source) {
2890
  if (sql == NULL || NULL == fp) {
10,928,095!
2891
    terrno = TSDB_CODE_INVALID_PARA;
×
2892
    if (fp) {
×
2893
      fp(param, NULL, terrno);
×
2894
    }
2895

2896
    return;
1✔
2897
  }
2898

2899
  size_t sqlLen = strlen(sql);
10,935,400✔
2900
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
10,935,400✔
2901
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
1!
2902
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
1✔
2903
    fp(param, NULL, terrno);
1✔
2904
    return;
1✔
2905
  }
2906

2907
  SRequestObj* pRequest = NULL;
10,935,399✔
2908
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
10,935,399✔
2909
  if (code != TSDB_CODE_SUCCESS) {
10,923,430!
2910
    terrno = code;
×
2911
    fp(param, NULL, terrno);
×
2912
    return;
×
2913
  }
2914

2915
  pRequest->source = source;
10,923,430✔
2916
  pRequest->body.queryFp = fp;
10,923,430✔
2917
  doAsyncQuery(pRequest, false);
10,923,430✔
2918
}
2919
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
×
2920
                                 int64_t reqid) {
2921
  if (sql == NULL || NULL == fp) {
×
2922
    terrno = TSDB_CODE_INVALID_PARA;
×
2923
    if (fp) {
×
2924
      fp(param, NULL, terrno);
×
2925
    }
2926

2927
    return;
×
2928
  }
2929

2930
  size_t sqlLen = strlen(sql);
×
2931
  if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
×
2932
    tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
×
2933
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
2934
    fp(param, NULL, terrno);
×
2935
    return;
×
2936
  }
2937

2938
  SRequestObj* pRequest = NULL;
×
2939
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
×
2940
  if (code != TSDB_CODE_SUCCESS) {
×
2941
    terrno = code;
×
2942
    fp(param, NULL, terrno);
×
2943
    return;
×
2944
  }
2945

2946
  pRequest->body.queryFp = fp;
×
2947
  doAsyncQuery(pRequest, false);
×
2948
}
2949

2950
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
10,927,423✔
2951
  if (NULL == taos) {
10,927,423✔
2952
    terrno = TSDB_CODE_TSC_DISCONNECTED;
1✔
2953
    return NULL;
1✔
2954
  }
2955

2956
  tscDebug("taos_query start with sql:%s", sql);
10,927,422✔
2957

2958
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
10,927,422✔
2959
  if (NULL == param) {
10,933,731!
2960
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
2961
    return NULL;
×
2962
  }
2963
  int32_t code = tsem_init(&param->sem, 0, 0);
10,933,731✔
2964
  if (TSDB_CODE_SUCCESS != code) {
10,929,637!
2965
    taosMemoryFree(param);
×
2966
    return NULL;
×
2967
  }
2968

2969
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
10,929,637✔
2970
  code = tsem_wait(&param->sem);
10,886,247✔
2971
  if (TSDB_CODE_SUCCESS != code) {
10,935,181!
2972
    taosMemoryFree(param);
×
2973
    return NULL;
×
2974
  }
2975
  code = tsem_destroy(&param->sem);
10,935,181✔
2976
  if (TSDB_CODE_SUCCESS != code) {
10,933,713!
2977
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
2978
  }
2979

2980
  SRequestObj* pRequest = NULL;
10,932,406✔
2981
  if (param->pRequest != NULL) {
10,932,406✔
2982
    param->pRequest->syncQuery = true;
10,932,405✔
2983
    pRequest = param->pRequest;
10,932,405✔
2984
    param->pRequest->inCallback = false;
10,932,405✔
2985
  }
2986
  taosMemoryFree(param);
10,932,406✔
2987

2988
  tscDebug("taos_query end with sql:%s", sql);
10,939,180✔
2989

2990
  return pRequest;
10,939,670✔
2991
}
2992

2993
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
×
2994
  if (NULL == taos) {
×
2995
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
2996
    return NULL;
×
2997
  }
2998

2999
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
×
3000
  if (param == NULL) {
×
3001
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
3002
    return NULL;
×
3003
  }
3004
  int32_t code = tsem_init(&param->sem, 0, 0);
×
3005
  if (TSDB_CODE_SUCCESS != code) {
×
3006
    taosMemoryFree(param);
×
3007
    return NULL;
×
3008
  }
3009

3010
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
×
3011
  code = tsem_wait(&param->sem);
×
3012
  if (TSDB_CODE_SUCCESS != code) {
×
3013
    taosMemoryFree(param);
×
3014
    return NULL;
×
3015
  }
3016
  SRequestObj* pRequest = NULL;
×
3017
  if (param->pRequest != NULL) {
×
3018
    param->pRequest->syncQuery = true;
×
3019
    pRequest = param->pRequest;
×
3020
  }
3021
  taosMemoryFree(param);
×
3022
  return pRequest;
×
3023
}
3024

3025
static void fetchCallback(void* pResult, void* param, int32_t code) {
994,679✔
3026
  SRequestObj* pRequest = (SRequestObj*)param;
994,679✔
3027

3028
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
994,679✔
3029

3030
  tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
994,679✔
3031
           pRequest->requestId);
3032

3033
  pResultInfo->pData = pResult;
994,677✔
3034
  pResultInfo->numOfRows = 0;
994,677✔
3035

3036
  if (code != TSDB_CODE_SUCCESS) {
994,677!
3037
    pRequest->code = code;
×
3038
    taosMemoryFreeClear(pResultInfo->pData);
×
3039
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3040
    return;
×
3041
  }
3042

3043
  if (pRequest->code != TSDB_CODE_SUCCESS) {
994,677!
3044
    taosMemoryFreeClear(pResultInfo->pData);
×
3045
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3046
    return;
×
3047
  }
3048

3049
  pRequest->code =
994,697✔
3050
      setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4);
994,677✔
3051
  if (pRequest->code != TSDB_CODE_SUCCESS) {
994,697!
3052
    pResultInfo->numOfRows = 0;
×
3053
    tscError("0x%" PRIx64 " fetch results failed, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(pRequest->code),
×
3054
             pRequest->requestId);
3055
  } else {
3056
    tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
994,697✔
3057
             pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
3058
             pRequest->requestId);
3059

3060
    STscObj*            pTscObj = pRequest->pTscObj;
994,696✔
3061
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
994,696✔
3062
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
994,696✔
3063
  }
3064

3065
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
994,698✔
3066
}
3067

3068
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
1,069,608✔
3069
  pRequest->body.fetchFp = fp;
1,069,608✔
3070
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
1,069,608✔
3071

3072
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,069,608✔
3073

3074
  // this query has no results or error exists, return directly
3075
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,069,608!
3076
    pResultInfo->numOfRows = 0;
×
3077
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3078
    return;
74,909✔
3079
  }
3080

3081
  // all data has returned to App already, no need to try again
3082
  if (pResultInfo->completed) {
1,069,605✔
3083
    // it is a local executed query, no need to do async fetch
3084
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
74,909✔
3085
      if (pResultInfo->localResultFetched) {
1,694✔
3086
        pResultInfo->numOfRows = 0;
847✔
3087
        pResultInfo->current = 0;
847✔
3088
      } else {
3089
        pResultInfo->localResultFetched = true;
847✔
3090
      }
3091
    } else {
3092
      pResultInfo->numOfRows = 0;
73,215✔
3093
    }
3094

3095
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
74,909✔
3096
    return;
74,909✔
3097
  }
3098

3099
  SSchedulerReq req = {
994,696✔
3100
      .syncReq = false,
3101
      .fetchFp = fetchCallback,
3102
      .cbParam = pRequest,
3103
  };
3104

3105
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
994,696✔
3106
  if (TSDB_CODE_SUCCESS != code) {
994,693!
3107
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3108
    // pRequest->body.fetchFp(param, pRequest, code);
3109
  }
3110
}
3111

3112
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
10,929,777✔
3113
  pRequest->inCallback = true;
10,929,777✔
3114
  int64_t this = pRequest->self;
10,929,777✔
3115
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
10,929,777!
3116
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
×
3117
    code = TSDB_CODE_SUCCESS;
×
3118
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3119
  }
3120
  pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
10,929,777✔
3121
  SRequestObj* pReq = acquireRequest(this);
10,938,047✔
3122
  if (pReq != NULL) {
10,937,224✔
3123
    pReq->inCallback = false;
10,936,058✔
3124
    (void)releaseRequest(this);
10,936,058✔
3125
  }
3126
}
10,928,814✔
3127

3128
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
665✔
3129
                       SParseSqlRes* pRes) {
3130
#ifndef TD_ENTERPRISE
3131
  return TSDB_CODE_SUCCESS;
3132
#else
3133
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
665✔
3134
#endif
3135
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc