• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5085

17 May 2026 01:15AM UTC coverage: 73.334% (-0.03%) from 73.366%
#5085

push

travis-ci

web-flow
feat (TDgpt): Dynamic Model Synchronization Enhancements (#35344)

* refactor: do some internal refactor.

* fix: fix multiprocess sync issue.

* feat: add dynamic anomaly detection and forecasting services

* fix: log error message for undeploying model in exception handling

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* Potential fix for pull request finding

Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>

* fix: handle undeploy when model exists only on disk

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/286aafa0-c3ce-4c27-b803-2707571e9dc1

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: guard dynamic registry concurrent access

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: tighten service list locking scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/5e4db858-6458-40f4-ac28-d1b1b7f97c18

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: restore prophet support and update tests per review feedback

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* fix: improve test name and move copy inside lock scope

Agent-Logs-Url: https://github.com/taosdata/TDengine/sessions/92298ae1-7da6-4d07-b20e-101c7cd0b26b

Co-authored-by: hjxilinx <8252296+hjxilinx@users.noreply.github.com>

* Potential fix for pull request finding

Co-au... (continued)

281451 of 383795 relevant lines covered (73.33%)

137008077.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.25
/source/client/src/clientImpl.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "cJSON.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "command.h"
22
#include "decimal.h"
23
#include "scheduler.h"
24
#include "tdatablock.h"
25
#include "tdataformat.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tmisce.h"
29
#include "tmsg.h"
30
#include "tmsgtype.h"
31
#include "tpagedbuf.h"
32
#include "tref.h"
33
#include "tsched.h"
34
#include "tversion.h"
35

36
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
37
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode);
38

39
void setQueryRequest(int64_t rId) {
731,088,822✔
40
  SRequestObj* pReq = acquireRequest(rId);
731,088,822✔
41
  if (pReq != NULL) {
731,117,323✔
42
    pReq->isQuery = true;
731,088,785✔
43
    (void)releaseRequest(rId);
731,088,057✔
44
  }
45
}
731,105,334✔
46

47
static bool stringLengthCheck(const char* str, size_t maxsize) {
189,785,231✔
48
  if (str == NULL) {
189,785,231✔
49
    return false;
×
50
  }
51

52
  size_t len = strlen(str);
189,785,231✔
53
  if (len <= 0 || len > maxsize) {
189,785,231✔
54
    return false;
25✔
55
  }
56

57
  return true;
189,786,606✔
58
}
59

60
static bool validateUserName(const char* user) { return stringLengthCheck(user, TSDB_USER_LEN - 1); }
94,654,938✔
61

62
static bool validatePassword(const char* passwd) { return stringLengthCheck(passwd, TSDB_PASSWORD_MAX_LEN); }
94,654,256✔
63

64
static bool validateDbName(const char* db) { return stringLengthCheck(db, TSDB_DB_NAME_LEN - 1); }
475,411✔
65

66
static char* getClusterKey(const char* user, const char* auth, const char* ip, int32_t port) {
94,654,876✔
67
  char key[512] = {0};
94,654,876✔
68
  if (user == NULL) {
94,654,876✔
69
    (void)snprintf(key, sizeof(key), "%s:%s:%d", auth, ip, port);
2,703✔
70
  } else {
71
    (void)snprintf(key, sizeof(key), "%s:%s:%s:%d", user, auth, ip, port);
94,652,173✔
72
  }
73
  return taosStrdup(key);
94,654,876✔
74
}
75

76
static int32_t escapeToPrinted(char* dst, size_t maxDstLength, const char* src, size_t srcLength) {
639,629✔
77
  if (dst == NULL || src == NULL || srcLength == 0) {
639,629✔
78
    return 0;
536✔
79
  }
80

81
  size_t escapeLength = 0;
639,093✔
82
  for (size_t i = 0; i < srcLength; ++i) {
18,104,112✔
83
    if (src[i] == '\"' || src[i] == '\\' || src[i] == '\b' || src[i] == '\f' || src[i] == '\n' || src[i] == '\r' ||
17,465,019✔
84
        src[i] == '\t') {
17,465,019✔
85
      escapeLength += 1;
×
86
    }
87
  }
88

89
  size_t dstLength = srcLength;
639,093✔
90
  if (escapeLength == 0) {
639,093✔
91
    (void)memcpy(dst, src, srcLength);
639,093✔
92
  } else {
93
    dstLength = 0;
×
94
    for (size_t i = 0; i < srcLength && dstLength <= maxDstLength; i++) {
×
95
      switch (src[i]) {
×
96
        case '\"':
×
97
          dst[dstLength++] = '\\';
×
98
          dst[dstLength++] = '\"';
×
99
          break;
×
100
        case '\\':
×
101
          dst[dstLength++] = '\\';
×
102
          dst[dstLength++] = '\\';
×
103
          break;
×
104
        case '\b':
×
105
          dst[dstLength++] = '\\';
×
106
          dst[dstLength++] = 'b';
×
107
          break;
×
108
        case '\f':
×
109
          dst[dstLength++] = '\\';
×
110
          dst[dstLength++] = 'f';
×
111
          break;
×
112
        case '\n':
×
113
          dst[dstLength++] = '\\';
×
114
          dst[dstLength++] = 'n';
×
115
          break;
×
116
        case '\r':
×
117
          dst[dstLength++] = '\\';
×
118
          dst[dstLength++] = 'r';
×
119
          break;
×
120
        case '\t':
×
121
          dst[dstLength++] = '\\';
×
122
          dst[dstLength++] = 't';
×
123
          break;
×
124
        default:
×
125
          dst[dstLength++] = src[i];
×
126
      }
127
    }
128
  }
129

130
  return dstLength;
639,093✔
131
}
132

133
bool chkRequestKilled(void* param) {
2,147,483,647✔
134
  bool         killed = false;
2,147,483,647✔
135
  SRequestObj* pRequest = acquireRequest((int64_t)param);
2,147,483,647✔
136
  if (NULL == pRequest || pRequest->killed) {
2,147,483,647✔
137
    killed = true;
2,378✔
138
  }
139

140
  (void)releaseRequest((int64_t)param);
2,147,483,647✔
141

142
  return killed;
2,147,483,647✔
143
}
144

145
void cleanupAppInfo() {
1,635,499✔
146
  taosHashCleanup(appInfo.pInstMap);
1,635,499✔
147
  taosHashCleanup(appInfo.pInstMapByClusterId);
1,635,499✔
148
  tscInfo("cluster instance map cleaned");
1,635,499✔
149
}
1,635,499✔
150

151
static int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db,
152
                               __taos_async_fn_t fp, void* param, SAppInstInfo* pAppInfo, int connType,
153
                               STscObj** pTscObj);
154

155
int32_t taos_connect_by_auth(const char* ip, const char* user, const char* auth, const char* totp, const char* db,
94,657,470✔
156
                             uint16_t port, int connType, STscObj** pObj) {
157
  TSC_ERR_RET(taos_init());
94,657,470✔
158

159
  if (user == NULL) {
94,658,290✔
160
    if (auth == NULL || strlen(auth) != (TSDB_TOKEN_LEN - 1)) {
3,636✔
161
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOKEN);
933✔
162
    }
163
  } else if (!validateUserName(user)) {
94,654,654✔
164
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_USER_LENGTH);
×
165
  }
166
  int32_t code = 0;
94,655,633✔
167

168
  char localDb[TSDB_DB_NAME_LEN] = {0};
94,655,633✔
169
  if (db != NULL && strlen(db) > 0) {
94,655,498✔
170
    if (!validateDbName(db)) {
475,411✔
171
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_DB_LENGTH);
×
172
    }
173

174
    tstrncpy(localDb, db, sizeof(localDb));
475,808✔
175
    (void)strdequote(localDb);
475,610✔
176
  }
177

178
  int32_t totpCode = -1;
94,655,647✔
179
  if (totp != NULL) {
94,655,647✔
180
    char* endptr = NULL;
2,458✔
181
    totpCode = taosStr2Int32(totp, &endptr, 10);
2,458✔
182
    if (endptr == totp || *endptr != '\0' || totpCode < 0 || totpCode > 999999) {
2,458✔
183
      TSC_ERR_RET(TSDB_CODE_TSC_INVALID_TOTP_CODE);
×
184
    }
185
  }
186

187
  SCorEpSet epSet = {0};
94,655,647✔
188
  if (ip) {
94,656,298✔
189
    TSC_ERR_RET(initEpSetFromCfg(ip, NULL, &epSet));
92,534,693✔
190
  } else {
191
    TSC_ERR_RET(initEpSetFromCfg(tsFirst, tsSecond, &epSet));
2,121,605✔
192
  }
193

194
  if (port) {
94,656,215✔
195
    epSet.epSet.eps[0].port = port;
91,663,831✔
196
    epSet.epSet.eps[1].port = port;
91,663,831✔
197
  }
198

199
  char* key = getClusterKey(user, auth, ip, port);
94,656,215✔
200
  if (NULL == key) {
94,657,528✔
201
    TSC_ERR_RET(terrno);
×
202
  }
203
  tscInfo("connecting to server, numOfEps:%d inUse:%d user:%s db:%s key:%s", epSet.epSet.numOfEps, epSet.epSet.inUse,
94,657,528✔
204
          user ? user : "", db, key);
205
  for (int32_t i = 0; i < epSet.epSet.numOfEps; ++i) {
191,438,481✔
206
    tscInfo("ep:%d, %s:%u", i, epSet.epSet.eps[i].fqdn, epSet.epSet.eps[i].port);
96,780,285✔
207
  }
208

209
  SAppInstInfo** pInst = NULL;
94,658,196✔
210
  code = taosThreadMutexLock(&appInfo.mutex);
94,658,196✔
211
  if (TSDB_CODE_SUCCESS != code) {
94,658,369✔
212
    tscError("failed to lock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
213
    TSC_ERR_RET(code);
×
214
  }
215

216
  pInst = taosHashGet(appInfo.pInstMap, key, strlen(key));
94,658,369✔
217
  SAppInstInfo* p = NULL;
94,658,369✔
218
  if (pInst == NULL) {
94,658,369✔
219
    p = taosMemoryCalloc(1, sizeof(struct SAppInstInfo));
1,736,754✔
220
    if (NULL == p) {
1,736,754✔
221
      TSC_ERR_JRET(terrno);
×
222
    }
223
    p->mgmtEp = epSet;
1,736,754✔
224
    code = taosThreadMutexInit(&p->qnodeMutex, NULL);
1,736,754✔
225
    if (TSDB_CODE_SUCCESS != code) {
1,736,754✔
226
      taosMemoryFree(p);
×
227
      TSC_ERR_JRET(code);
×
228
    }
229
    code = openTransporter(user, auth, tsNumOfCores / 2, &p->pTransporter);
1,736,754✔
230
    if (TSDB_CODE_SUCCESS != code) {
1,736,754✔
231
      taosMemoryFree(p);
202✔
232
      TSC_ERR_JRET(code);
202✔
233
    }
234
    code = appHbMgrInit(p, key, &p->pAppHbMgr);
1,736,552✔
235
    if (TSDB_CODE_SUCCESS != code) {
1,736,552✔
236
      destroyAppInst(&p);
×
237
      TSC_ERR_JRET(code);
×
238
    }
239
    code = taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
1,736,552✔
240
    if (TSDB_CODE_SUCCESS != code) {
1,736,552✔
241
      destroyAppInst(&p);
×
242
      TSC_ERR_JRET(code);
×
243
    }
244
    p->instKey = key;
1,736,552✔
245
    key = NULL;
1,736,552✔
246
    tscInfo("new app inst mgr:%p, user:%s, ip:%s, port:%d", p, user ? user : "", epSet.epSet.eps[0].fqdn,
1,736,552✔
247
            epSet.epSet.eps[0].port);
248

249
    pInst = &p;
1,736,552✔
250
  } else {
251
    if (NULL == *pInst || NULL == (*pInst)->pAppHbMgr) {
92,921,615✔
252
      tscError("*pInst:%p, pAppHgMgr:%p", *pInst, (*pInst) ? (*pInst)->pAppHbMgr : NULL);
×
253
      TSC_ERR_JRET(TSDB_CODE_TSC_INTERNAL_ERROR);
×
254
    }
255
    // reset to 0 in case of conn with duplicated user key but its user has ever been dropped.
256
    atomic_store_8(&(*pInst)->pAppHbMgr->connHbFlag, 0);
92,921,615✔
257
  }
258

259
_return:
94,658,369✔
260

261
  if (TSDB_CODE_SUCCESS != code) {
94,658,369✔
262
    (void)taosThreadMutexUnlock(&appInfo.mutex);
202✔
263
    taosMemoryFreeClear(key);
202✔
264
    return code;
202✔
265
  } else {
266
    code = taosThreadMutexUnlock(&appInfo.mutex);
94,658,167✔
267
    taosMemoryFreeClear(key);
94,656,927✔
268
    if (TSDB_CODE_SUCCESS != code) {
94,658,167✔
269
      tscError("failed to unlock app info, code:%s", tstrerror(TAOS_SYSTEM_ERROR(code)));
×
270
      return code;
×
271
    }
272
    return taosConnectImpl(user, auth, totpCode, localDb, NULL, NULL, *pInst, connType, pObj);
94,658,167✔
273
  }
274
}
275

276
int32_t taos_connect_internal(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
94,652,237✔
277
                              uint16_t port, int connType, STscObj** pObj) {
278
  char auth[TSDB_PASSWORD_LEN + 1] = {0};
94,652,237✔
279
  if (!validatePassword(pass)) {
94,652,237✔
280
    TSC_ERR_RET(TSDB_CODE_TSC_INVALID_PASS_LENGTH);
×
281
  }
282

283
  taosEncryptPass_c((uint8_t*)pass, strlen(pass), auth);
94,655,101✔
284
  return taos_connect_by_auth(ip, user, auth, totp, db, port, connType, pObj);
94,655,096✔
285
}
286

287
// SAppInstInfo* getAppInstInfo(const char* clusterKey) {
288
//   SAppInstInfo** ppAppInstInfo = taosHashGet(appInfo.pInstMap, clusterKey, strlen(clusterKey));
289
//   if (ppAppInstInfo != NULL && *ppAppInstInfo != NULL) {
290
//     return *ppAppInstInfo;
291
//   } else {
292
//     return NULL;
293
//   }
294
// }
295

296
void freeQueryParam(SSyncQueryParam* param) {
582,025✔
297
  if (param == NULL) return;
582,025✔
298
  if (TSDB_CODE_SUCCESS != tsem_destroy(&param->sem)) {
582,025✔
299
    tscError("failed to destroy semaphore in freeQueryParam");
×
300
  }
301
  taosMemoryFree(param);
582,025✔
302
}
303

304
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
1,225,744,905✔
305
                     SRequestObj** pRequest, int64_t reqid) {
306
  int32_t code = createRequest(connId, TSDB_SQL_SELECT, reqid, pRequest);
1,225,744,905✔
307
  if (TSDB_CODE_SUCCESS != code) {
1,225,751,614✔
308
    tscError("failed to malloc sqlObj, %s", sql);
1,331✔
309
    return code;
1,331✔
310
  }
311

312
  (*pRequest)->sqlstr = taosMemoryMalloc(sqlLen + 1);
1,225,750,283✔
313
  if ((*pRequest)->sqlstr == NULL) {
1,225,729,238✔
314
    tscError("req:0x%" PRIx64 ", failed to prepare sql string buffer, %s", (*pRequest)->self, sql);
×
315
    destroyRequest(*pRequest);
×
316
    *pRequest = NULL;
×
317
    return terrno;
×
318
  }
319

320
  (void)strntolower((*pRequest)->sqlstr, sql, (int32_t)sqlLen);
1,225,748,133✔
321
  (*pRequest)->sqlstr[sqlLen] = 0;
1,225,806,342✔
322
  (*pRequest)->sqlLen = sqlLen;
1,225,809,868✔
323
  (*pRequest)->validateOnly = validateSql;
1,225,810,390✔
324
  (*pRequest)->stmtBindVersion = 0;
1,225,807,072✔
325

326
  code = sqlSecurityCheckStringLevel(*pRequest, (*pRequest)->sqlstr, (*pRequest)->sqlLen);
1,225,795,455✔
327
  if (code != TSDB_CODE_SUCCESS) {
1,225,727,275✔
328
    tscWarn("req:0x%" PRIx64 ", sql security string check failed, QID:0x%" PRIx64 ", code:%s", (*pRequest)->self,
1,732✔
329
            (*pRequest)->requestId, tstrerror(code));
330
    destroyRequest(*pRequest);
1,732✔
331
    *pRequest = NULL;
1,732✔
332
    return code;
1,732✔
333
  }
334

335
  ((SSyncQueryParam*)(*pRequest)->body.interParam)->userParam = param;
1,225,725,543✔
336

337
  STscObj* pTscObj = (*pRequest)->pTscObj;
1,225,782,666✔
338
  int32_t  err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
1,225,761,630✔
339
                             sizeof((*pRequest)->self));
340
  if (err) {
1,225,759,314✔
341
    tscError("req:0x%" PRId64 ", failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
342
             (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
343
    destroyRequest(*pRequest);
×
344
    *pRequest = NULL;
×
345
    return terrno;
×
346
  }
347

348
  (*pRequest)->allocatorRefId = -1;
1,225,759,314✔
349
  if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
1,225,751,549✔
350
    if (TSDB_CODE_SUCCESS !=
497,515,998✔
351
        nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
497,512,938✔
352
      tscError("req:0x%" PRId64 ", failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
×
353
               (*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
354
      destroyRequest(*pRequest);
×
355
      *pRequest = NULL;
×
356
      return terrno;
×
357
    }
358
  }
359

360
  tscDebug("req:0x%" PRIx64 ", build request, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->requestId);
1,225,821,646✔
361
  return TSDB_CODE_SUCCESS;
1,225,725,171✔
362
}
363

364
int32_t buildPreviousRequest(SRequestObj* pRequest, const char* sql, SRequestObj** pNewRequest) {
×
365
  int32_t code =
366
      buildRequest(pRequest->pTscObj->id, sql, strlen(sql), pRequest, pRequest->validateOnly, pNewRequest, 0);
×
367
  if (TSDB_CODE_SUCCESS == code) {
×
368
    pRequest->relation.prevRefId = (*pNewRequest)->self;
×
369
    (*pNewRequest)->relation.nextRefId = pRequest->self;
×
370
    (*pNewRequest)->relation.userRefId = pRequest->self;
×
371
    (*pNewRequest)->isSubReq = true;
×
372
  }
373
  return code;
×
374
}
375

376
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb) {
7,884,434✔
377
  STscObj* pTscObj = pRequest->pTscObj;
7,884,434✔
378

379
  SParseContext cxt = {
7,888,638✔
380
      .requestId = pRequest->requestId,
7,887,880✔
381
      .requestRid = pRequest->self,
7,884,380✔
382
      .acctId = pTscObj->acctId,
7,884,823✔
383
      .db = pRequest->pDb,
7,886,410✔
384
      .topicQuery = topicQuery,
385
      .pSql = pRequest->sqlstr,
7,885,544✔
386
      .sqlLen = pRequest->sqlLen,
7,887,078✔
387
      .pMsg = pRequest->msgBuf,
7,886,698✔
388
      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
389
      .pTransporter = pTscObj->pAppInfo->pTransporter,
7,885,990✔
390
      .pStmtCb = pStmtCb,
391
      .pUser = pTscObj->user,
7,884,722✔
392
      .userId = pTscObj->userId,
7,881,648✔
393
      .isSuperUser = (0 == strcmp(pTscObj->user, TSDB_DEFAULT_USER)),
7,885,201✔
394
      .enableSysInfo = pTscObj->sysInfo,
7,883,221✔
395
      .minSecLevel = pTscObj->minSecLevel,
7,882,431✔
396
      .maxSecLevel = pTscObj->maxSecLevel,
7,884,759✔
397
      .macMode = pTscObj->pAppInfo->serverCfg.macActive,  // propagates cluster-level MAC state into parser/executor
7,882,768✔
398
      .sodInitial = pTscObj->pAppInfo->serverCfg.sodInitial,
7,885,580✔
399
      .svrVer = pTscObj->sVer,
7,888,666✔
400
      .nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
7,886,009✔
401
      .stmtBindVersion = pRequest->stmtBindVersion,
7,886,735✔
402
      .setQueryFp = setQueryRequest,
403
      .timezone = pTscObj->optionInfo.timezone,
7,886,785✔
404
      .charsetCxt = pTscObj->optionInfo.charsetCxt,
7,886,714✔
405
  };
406

407
  cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
7,884,443✔
408
  int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
7,888,702✔
409
  if (code != TSDB_CODE_SUCCESS) {
7,884,082✔
410
    return code;
×
411
  }
412

413
  code = qParseSql(&cxt, pQuery);
7,884,082✔
414
  if (TSDB_CODE_SUCCESS == code) {
7,879,090✔
415
    if ((*pQuery)->haveResultSet) {
7,876,233✔
416
      code = setResSchemaInfo(&pRequest->body.resInfo, (*pQuery)->pResSchema, (*pQuery)->numOfResCols,
441✔
417
                              (*pQuery)->pResExtSchema, pRequest->stmtBindVersion > 0);
441✔
418
      setResPrecision(&pRequest->body.resInfo, (*pQuery)->precision);
441✔
419
    }
420
  }
421

422
  if (TSDB_CODE_SUCCESS == code || NEED_CLIENT_HANDLE_ERROR(code)) {
7,879,516✔
423
    TSWAP(pRequest->dbList, (*pQuery)->pDbList);
7,874,172✔
424
    TSWAP(pRequest->tableList, (*pQuery)->pTableList);
7,880,568✔
425
    TSWAP(pRequest->targetTableList, (*pQuery)->pTargetTableList);
7,877,777✔
426
  }
427

428
  taosArrayDestroy(cxt.pTableMetaPos);
7,874,659✔
429
  taosArrayDestroy(cxt.pTableVgroupPos);
7,872,989✔
430

431
  return code;
7,877,547✔
432
}
433
#ifdef TD_ENTERPRISE
434
static uint8_t getShowVarPrivMask(SRequestObj* pRequest) {
17,931✔
435
  SCatalog*        pCatalog = NULL;
17,931✔
436
  SGetUserAuthRsp  authRsp = {0};
17,931✔
437
  STscObj*         pTscObj = pRequest->pTscObj;
17,931✔
438
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
17,931✔
439
                           .requestId = pRequest->requestId,
17,931✔
440
                           .requestObjRefId = pRequest->self,
17,931✔
441
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
17,931✔
442

443
  if (TSDB_CODE_SUCCESS != catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog)) {
17,931✔
444
    return 0;
×
445
  }
446
  if (TSDB_CODE_SUCCESS != catalogGetUserAuth(pCatalog, &conn, pTscObj->user, &authRsp)) {
17,931✔
447
    return 0;
×
448
  }
449

450
  uint8_t mask = 0;
17,931✔
451
  if (PRIV_HAS(&authRsp.sysPrivs, PRIV_VAR_SYSTEM_SHOW)) mask |= SHOW_VAR_PRIV_SYSTEM;
17,931✔
452
  if (PRIV_HAS(&authRsp.sysPrivs, PRIV_VAR_SECURITY_SHOW)) mask |= SHOW_VAR_PRIV_SECURITY;
17,931✔
453
  if (PRIV_HAS(&authRsp.sysPrivs, PRIV_VAR_AUDIT_SHOW)) mask |= SHOW_VAR_PRIV_AUDIT;
17,931✔
454
  if (PRIV_HAS(&authRsp.sysPrivs, PRIV_VAR_DEBUG_SHOW)) mask |= SHOW_VAR_PRIV_DEBUG;
17,931✔
455
  return mask;
17,931✔
456
}
457
#endif
458

459
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
×
460
  SRetrieveTableRsp* pRsp = NULL;
×
461
  int8_t             biMode = atomic_load_8(&pRequest->pTscObj->biMode);
×
462
  uint8_t            showVarPrivMask = SHOW_VAR_PRIV_ALL;
×
463
#ifdef TD_ENTERPRISE
464
  if (pQuery->pRoot != NULL && nodeType(pQuery->pRoot) == QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT) {
×
465
    showVarPrivMask = getShowVarPrivMask(pRequest);
×
466
  }
467
#endif
468
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, showVarPrivMask, pQuery->pRoot, &pRsp,
×
469
                              biMode, pRequest->pTscObj->optionInfo.charsetCxt);
×
470
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
×
471
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
×
472
                                 pRequest->stmtBindVersion > 0);
×
473
  }
474

475
  return code;
×
476
}
477

478
int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
460,015✔
479
  // drop table if exists not_exists_table
480
  if (NULL == pQuery->pCmdMsg) {
460,015✔
481
    return TSDB_CODE_SUCCESS;
×
482
  }
483

484
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
460,015✔
485
  pRequest->type = pMsgInfo->msgType;
460,015✔
486
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
460,015✔
487
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
460,015✔
488

489
  STscObj*      pTscObj = pRequest->pTscObj;
460,015✔
490
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
460,015✔
491

492
  // int64_t transporterId = 0;
493
  TSC_ERR_RET(asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg));
460,015✔
494
  TSC_ERR_RET(tsem_wait(&pRequest->body.rspSem));
460,015✔
495
  return TSDB_CODE_SUCCESS;
460,015✔
496
}
497

498
static SAppInstInfo* getAppInfo(SRequestObj* pRequest) { return pRequest->pTscObj->pAppInfo; }
2,072,768,293✔
499

500
void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
6,853,347✔
501
  SRetrieveTableRsp* pRsp = NULL;
6,853,347✔
502
  if (pRequest->validateOnly) {
6,853,347✔
503
    doRequestCallback(pRequest, 0);
11,610✔
504
    return;
11,610✔
505
  }
506

507
  uint8_t showVarPrivMask = SHOW_VAR_PRIV_ALL;
6,841,737✔
508
#ifdef TD_ENTERPRISE
509
  if (pQuery->pRoot != NULL && nodeType(pQuery->pRoot) == QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT) {
6,841,737✔
510
    showVarPrivMask = getShowVarPrivMask(pRequest);
17,931✔
511
  }
512
#endif
513
  int32_t code = qExecCommand(&pRequest->pTscObj->id, pRequest->pTscObj->sysInfo, showVarPrivMask, pQuery->pRoot, &pRsp,
13,653,935✔
514
                              atomic_load_8(&pRequest->pTscObj->biMode), pRequest->pTscObj->optionInfo.charsetCxt);
13,653,915✔
515
  if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
6,841,734✔
516
    code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, pRequest->body.resInfo.convertUcs4,
3,759,575✔
517
                                 pRequest->stmtBindVersion > 0);
3,759,575✔
518
  }
519

520
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
6,841,674✔
521
  pRequest->code = code;
6,841,653✔
522

523
  if (pRequest->code != TSDB_CODE_SUCCESS) {
6,841,656✔
524
    pResultInfo->numOfRows = 0;
2,222✔
525
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
2,222✔
526
             pRequest->requestId);
527
  } else {
528
    tscDebug(
6,839,451✔
529
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
530
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
531
  }
532

533
  doRequestCallback(pRequest, code);
6,841,673✔
534
}
535

536
int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
111,043,186✔
537
  if (pRequest->validateOnly) {
111,043,186✔
538
    doRequestCallback(pRequest, 0);
×
539
    return TSDB_CODE_SUCCESS;
×
540
  }
541

542
  // drop table if exists not_exists_table
543
  if (NULL == pQuery->pCmdMsg) {
111,043,186✔
544
    doRequestCallback(pRequest, 0);
8,010✔
545
    return TSDB_CODE_SUCCESS;
8,010✔
546
  }
547

548
  SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
111,035,176✔
549
  // Clear pQuery->pCmdMsg before the async call so that nodesDestroyNode (which may be
550
  // triggered by the async response callback on another thread) will not double-free pCmdMsg.
551
  pQuery->pCmdMsg = NULL;
111,035,113✔
552
  pRequest->type = pMsgInfo->msgType;
111,035,176✔
553
  pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
111,034,635✔
554
  pMsgInfo->pMsg = NULL;  // pMsg transferred to SMsgSendInfo management
111,034,996✔
555

556
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
111,035,045✔
557
  SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
111,034,869✔
558

559
  int32_t code = asyncSendMsgToServer(pAppInfo->pTransporter, &pMsgInfo->epSet, NULL, pSendMsg);
111,038,150✔
560
  // pMsgInfo->pMsg has been transferred to pRequest->body.requestMsg and pMsgInfo->epSet has
561
  // been consumed by asyncSendMsgToServer; the SCmdMsgInfo struct itself is no longer needed.
562
  // Use the local pMsgInfo variable (not pQuery->pCmdMsg) to avoid a use-after-free: the async
563
  // response callback may have run on another thread and destroyed pQuery by this point.
564
  taosMemoryFree(pMsgInfo);
111,040,341✔
565
  if (code) {
111,043,063✔
566
    doRequestCallback(pRequest, code);
×
567
  }
568
  return code;
111,043,521✔
569
}
570

571
int compareQueryNodeLoad(const void* elem1, const void* elem2) {
224,660✔
572
  SQueryNodeLoad* node1 = (SQueryNodeLoad*)elem1;
224,660✔
573
  SQueryNodeLoad* node2 = (SQueryNodeLoad*)elem2;
224,660✔
574

575
  if (node1->load < node2->load) {
224,660✔
576
    return -1;
×
577
  }
578

579
  return node1->load > node2->load;
224,660✔
580
}
581

582
int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
404,773✔
583
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
404,773✔
584
  if (pInfo->pQnodeList) {
404,773✔
585
    taosArrayDestroy(pInfo->pQnodeList);
398,501✔
586
    pInfo->pQnodeList = NULL;
398,501✔
587
    tscDebug("QnodeList cleared in cluster 0x%" PRIx64, pInfo->clusterId);
398,501✔
588
  }
589

590
  if (pNodeList) {
404,773✔
591
    pInfo->pQnodeList = taosArrayDup(pNodeList, NULL);
404,773✔
592
    taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
404,773✔
593
    tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%ld", pInfo->clusterId,
404,773✔
594
             taosArrayGetSize(pInfo->pQnodeList));
595
  }
596
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
404,773✔
597

598
  return TSDB_CODE_SUCCESS;
404,773✔
599
}
600

601
int32_t qnodeRequired(SRequestObj* pRequest, bool* required) {
1,218,688,683✔
602
  if (QUERY_POLICY_VNODE == tsQueryPolicy || QUERY_POLICY_CLIENT == tsQueryPolicy) {
1,218,688,683✔
603
    *required = false;
1,204,963,607✔
604
    return TSDB_CODE_SUCCESS;
1,204,970,201✔
605
  }
606

607
  int32_t       code = TSDB_CODE_SUCCESS;
13,725,076✔
608
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
13,725,076✔
609
  *required = false;
13,725,638✔
610

611
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
13,725,638✔
612
  *required = (NULL == pInfo->pQnodeList);
13,725,076✔
613
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
13,725,076✔
614
  return TSDB_CODE_SUCCESS;
13,725,076✔
615
}
616

617
int32_t getQnodeList(SRequestObj* pRequest, SArray** pNodeList) {
×
618
  SAppInstInfo* pInfo = pRequest->pTscObj->pAppInfo;
×
619
  int32_t       code = 0;
×
620

621
  TSC_ERR_RET(taosThreadMutexLock(&pInfo->qnodeMutex));
×
622
  if (pInfo->pQnodeList) {
×
623
    *pNodeList = taosArrayDup(pInfo->pQnodeList, NULL);
×
624
  }
625
  TSC_ERR_RET(taosThreadMutexUnlock(&pInfo->qnodeMutex));
×
626
  if (NULL == *pNodeList) {
×
627
    SCatalog* pCatalog = NULL;
×
628
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
629
    if (TSDB_CODE_SUCCESS == code) {
×
630
      *pNodeList = taosArrayInit(5, sizeof(SQueryNodeLoad));
×
631
      if (NULL == pNodeList) {
×
632
        TSC_ERR_RET(terrno);
×
633
      }
634
      SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
×
635
                               .requestId = pRequest->requestId,
×
636
                               .requestObjRefId = pRequest->self,
×
637
                               .mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp)};
×
638
      code = catalogGetQnodeList(pCatalog, &conn, *pNodeList);
×
639
    }
640

641
    if (TSDB_CODE_SUCCESS == code && *pNodeList) {
×
642
      code = updateQnodeList(pInfo, *pNodeList);
×
643
    }
644
  }
645

646
  return code;
×
647
}
648

649
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
9,379,882✔
650
  pRequest->type = pQuery->msgType;
9,379,882✔
651
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
9,382,224✔
652

653
  SPlanContext cxt = {.queryId = pRequest->requestId,
9,767,358✔
654
                      .acctId = pRequest->pTscObj->acctId,
9,379,179✔
655
                      .mgmtEpSet = getEpSet_s(&pAppInfo->mgmtEp),
9,377,248✔
656
                      .pAstRoot = pQuery->pRoot,
9,385,028✔
657
                      .showRewrite = pQuery->showRewrite,
9,382,019✔
658
                      .pMsg = pRequest->msgBuf,
9,383,503✔
659
                      .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
660
                      .pUser = pRequest->pTscObj->user,
9,379,600✔
661
                      .userId = pRequest->pTscObj->userId,
9,370,573✔
662
                      .timezone = pRequest->pTscObj->optionInfo.timezone,
9,379,330✔
663
                      .sysInfo = pRequest->pTscObj->sysInfo,
9,376,643✔
664
                      .minSecLevel = pRequest->pTscObj->minSecLevel,
9,379,437✔
665
                      .maxSecLevel = pRequest->pTscObj->maxSecLevel,
9,374,737✔
666
                      .macMode = pAppInfo->serverCfg.macActive};
9,380,476✔
667

668
  return qCreateQueryPlan(&cxt, pPlan, pNodeList);
9,374,352✔
669
}
670

671
int32_t setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols,
306,932,696✔
672
                         const SExtSchema* pExtSchema, bool isStmt) {
673
  if (pResInfo == NULL || pSchema == NULL || numOfCols <= 0) {
306,932,696✔
674
    tscError("invalid paras, pResInfo == NULL || pSchema == NULL || numOfCols <= 0");
×
675
    return TSDB_CODE_INVALID_PARA;
×
676
  }
677

678
  pResInfo->numOfCols = numOfCols;
306,943,466✔
679
  if (pResInfo->fields != NULL) {
306,944,794✔
680
    taosMemoryFree(pResInfo->fields);
16,541✔
681
  }
682
  if (pResInfo->userFields != NULL) {
306,941,439✔
683
    taosMemoryFree(pResInfo->userFields);
16,541✔
684
  }
685
  pResInfo->fields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD_E));
306,942,283✔
686
  if (NULL == pResInfo->fields) return terrno;
306,933,552✔
687
  pResInfo->userFields = taosMemoryCalloc(numOfCols, sizeof(TAOS_FIELD));
306,938,247✔
688
  if (NULL == pResInfo->userFields) {
306,929,092✔
689
    taosMemoryFree(pResInfo->fields);
×
690
    return terrno;
×
691
  }
692
  if (numOfCols != pResInfo->numOfCols) {
306,930,560✔
693
    tscError("numOfCols:%d != pResInfo->numOfCols:%d", numOfCols, pResInfo->numOfCols);
×
694
    return TSDB_CODE_FAILED;
×
695
  }
696

697
  for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
1,577,005,800✔
698
    pResInfo->fields[i].type = pSchema[i].type;
1,270,057,520✔
699

700
    pResInfo->userFields[i].type = pSchema[i].type;
1,270,063,125✔
701
    // userFields must convert to type bytes, no matter isStmt or not
702
    pResInfo->userFields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, false);
1,270,067,078✔
703
    pResInfo->fields[i].bytes = calcTypeBytesFromSchemaBytes(pSchema[i].type, pSchema[i].bytes, isStmt);
1,270,060,940✔
704
    if (IS_DECIMAL_TYPE(pSchema[i].type) && pExtSchema) {
1,270,078,967✔
705
      decimalFromTypeMod(pExtSchema[i].typeMod, &pResInfo->fields[i].precision, &pResInfo->fields[i].scale);
1,616,974✔
706
    }
707

708
    tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name));
1,270,075,861✔
709
    tstrncpy(pResInfo->userFields[i].name, pSchema[i].name, tListLen(pResInfo->userFields[i].name));
1,270,079,684✔
710
  }
711
  return TSDB_CODE_SUCCESS;
306,948,187✔
712
}
713

714
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) {
218,146,795✔
715
  if (precision != TSDB_TIME_PRECISION_MILLI && precision != TSDB_TIME_PRECISION_MICRO &&
218,146,795✔
716
      precision != TSDB_TIME_PRECISION_NANO) {
717
    return;
×
718
  }
719

720
  pResInfo->precision = precision;
218,146,795✔
721
}
722

723
int32_t buildVnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pDbVgList) {
223,076,159✔
724
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
223,076,159✔
725
  if (NULL == nodeList) {
223,092,983✔
726
    return terrno;
×
727
  }
728
  char* policy = (tsQueryPolicy == QUERY_POLICY_VNODE) ? "vnode" : "client";
223,098,236✔
729

730
  int32_t dbNum = taosArrayGetSize(pDbVgList);
223,098,236✔
731
  for (int32_t i = 0; i < dbNum; ++i) {
443,434,570✔
732
    SArray* pVg = taosArrayGetP(pDbVgList, i);
220,315,513✔
733
    if (NULL == pVg) {
220,322,697✔
734
      continue;
×
735
    }
736
    int32_t vgNum = taosArrayGetSize(pVg);
220,322,697✔
737
    if (vgNum <= 0) {
220,321,744✔
738
      continue;
681,607✔
739
    }
740

741
    for (int32_t j = 0; j < vgNum; ++j) {
766,549,284✔
742
      SVgroupInfo* pInfo = taosArrayGet(pVg, j);
546,899,872✔
743
      if (NULL == pInfo) {
546,913,523✔
744
        taosArrayDestroy(nodeList);
×
745
        return TSDB_CODE_OUT_OF_RANGE;
×
746
      }
747
      SQueryNodeLoad load = {0};
546,913,523✔
748
      load.addr.nodeId = pInfo->vgId;
546,905,095✔
749
      load.addr.epSet = pInfo->epSet;
546,916,448✔
750

751
      if (NULL == taosArrayPush(nodeList, &load)) {
546,865,989✔
752
        taosArrayDestroy(nodeList);
×
753
        return terrno;
×
754
      }
755
    }
756
  }
757

758
  int32_t vnodeNum = taosArrayGetSize(nodeList);
223,119,057✔
759
  if (vnodeNum > 0) {
223,108,852✔
760
    tscDebug("0x%" PRIx64 " %s policy, use vnode list, num:%d", pRequest->requestId, policy, vnodeNum);
219,289,162✔
761
    goto _return;
219,286,877✔
762
  }
763

764
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
3,819,690✔
765
  if (mnodeNum <= 0) {
3,816,700✔
766
    tscDebug("0x%" PRIx64 " %s policy, empty node list", pRequest->requestId, policy);
×
767
    goto _return;
×
768
  }
769

770
  void* pData = taosArrayGet(pMnodeList, 0);
3,816,700✔
771
  if (NULL == pData) {
3,816,700✔
772
    taosArrayDestroy(nodeList);
×
773
    return TSDB_CODE_OUT_OF_RANGE;
×
774
  }
775
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
3,816,700✔
776
    taosArrayDestroy(nodeList);
×
777
    return terrno;
×
778
  }
779

780
  tscDebug("0x%" PRIx64 " %s policy, use mnode list, num:%d", pRequest->requestId, policy, mnodeNum);
3,816,700✔
781

782
_return:
141,804✔
783

784
  *pNodeList = nodeList;
223,100,866✔
785

786
  return TSDB_CODE_SUCCESS;
223,096,598✔
787
}
788

789
int32_t buildQnodePolicyNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SArray* pQnodeList) {
1,739,986✔
790
  SArray* nodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,739,986✔
791
  if (NULL == nodeList) {
1,739,986✔
792
    return terrno;
×
793
  }
794

795
  int32_t qNodeNum = taosArrayGetSize(pQnodeList);
1,739,986✔
796
  if (qNodeNum > 0) {
1,739,986✔
797
    void* pData = taosArrayGet(pQnodeList, 0);
1,712,976✔
798
    if (NULL == pData) {
1,712,976✔
799
      taosArrayDestroy(nodeList);
×
800
      return TSDB_CODE_OUT_OF_RANGE;
×
801
    }
802
    if (NULL == taosArrayAddBatch(nodeList, pData, qNodeNum)) {
1,712,976✔
803
      taosArrayDestroy(nodeList);
×
804
      return terrno;
×
805
    }
806
    tscDebug("0x%" PRIx64 " qnode policy, use qnode list, num:%d", pRequest->requestId, qNodeNum);
1,712,976✔
807
    goto _return;
1,712,976✔
808
  }
809

810
  int32_t mnodeNum = taosArrayGetSize(pMnodeList);
27,010✔
811
  if (mnodeNum <= 0) {
27,010✔
812
    tscDebug("0x%" PRIx64 " qnode policy, empty node list", pRequest->requestId);
×
813
    goto _return;
×
814
  }
815

816
  void* pData = taosArrayGet(pMnodeList, 0);
27,010✔
817
  if (NULL == pData) {
27,010✔
818
    taosArrayDestroy(nodeList);
×
819
    return TSDB_CODE_OUT_OF_RANGE;
×
820
  }
821
  if (NULL == taosArrayAddBatch(nodeList, pData, mnodeNum)) {
27,010✔
822
    taosArrayDestroy(nodeList);
×
823
    return terrno;
×
824
  }
825

826
  tscDebug("0x%" PRIx64 " qnode policy, use mnode list, num:%d", pRequest->requestId, mnodeNum);
27,010✔
827

828
_return:
×
829

830
  *pNodeList = nodeList;
1,739,986✔
831

832
  return TSDB_CODE_SUCCESS;
1,739,986✔
833
}
834

835
void freeVgList(void* list) {
9,245,077✔
836
  SArray* pList = *(SArray**)list;
9,245,077✔
837
  taosArrayDestroy(pList);
9,249,846✔
838
}
9,241,371✔
839

840
int32_t buildAsyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList, SMetaData* pResultMeta) {
215,441,928✔
841
  SArray* pDbVgList = NULL;
215,441,928✔
842
  SArray* pQnodeList = NULL;
215,441,928✔
843
  FDelete fp = NULL;
215,441,928✔
844
  int32_t code = 0;
215,441,928✔
845

846
  switch (tsQueryPolicy) {
215,441,928✔
847
    case QUERY_POLICY_VNODE:
213,711,469✔
848
    case QUERY_POLICY_CLIENT: {
849
      if (pResultMeta) {
213,711,469✔
850
        pDbVgList = taosArrayInit(4, POINTER_BYTES);
213,716,989✔
851
        if (NULL == pDbVgList) {
213,718,095✔
852
          code = terrno;
×
853
          goto _return;
×
854
        }
855
        int32_t dbNum = taosArrayGetSize(pResultMeta->pDbVgroup);
213,718,095✔
856
        for (int32_t i = 0; i < dbNum; ++i) {
424,778,742✔
857
          SMetaRes* pRes = taosArrayGet(pResultMeta->pDbVgroup, i);
211,064,782✔
858
          if (pRes->code || NULL == pRes->pRes) {
211,064,512✔
859
            continue;
663✔
860
          }
861

862
          if (NULL == taosArrayPush(pDbVgList, &pRes->pRes)) {
422,133,490✔
863
            code = terrno;
×
864
            goto _return;
×
865
          }
866
        }
867
      } else {
868
        fp = freeVgList;
1,782✔
869

870
        int32_t dbNum = taosArrayGetSize(pRequest->dbList);
1,782✔
871
        if (dbNum > 0) {
1,782✔
872
          SCatalog*     pCtg = NULL;
1,782✔
873
          SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,782✔
874
          code = catalogGetHandle(pInst->clusterId, &pCtg);
1,782✔
875
          if (code != TSDB_CODE_SUCCESS) {
1,782✔
876
            goto _return;
×
877
          }
878

879
          pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
1,782✔
880
          if (NULL == pDbVgList) {
1,782✔
881
            code = terrno;
×
882
            goto _return;
×
883
          }
884
          SArray* pVgList = NULL;
1,782✔
885
          for (int32_t i = 0; i < dbNum; ++i) {
3,564✔
886
            char*            dbFName = taosArrayGet(pRequest->dbList, i);
1,782✔
887
            SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
1,782✔
888
                                     .requestId = pRequest->requestId,
1,782✔
889
                                     .requestObjRefId = pRequest->self,
1,782✔
890
                                     .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
1,782✔
891

892
            // catalogGetDBVgList will handle dbFName == null.
893
            code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
1,782✔
894
            if (code) {
1,782✔
895
              goto _return;
×
896
            }
897

898
            if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
1,782✔
899
              code = terrno;
×
900
              goto _return;
×
901
            }
902
          }
903
        }
904
      }
905

906
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
213,715,742✔
907
      break;
213,714,613✔
908
    }
909
    case QUERY_POLICY_HYBRID:
1,739,986✔
910
    case QUERY_POLICY_QNODE: {
911
      if (pResultMeta && taosArrayGetSize(pResultMeta->pQnodeList) > 0) {
1,774,864✔
912
        SMetaRes* pRes = taosArrayGet(pResultMeta->pQnodeList, 0);
34,878✔
913
        if (pRes->code) {
34,878✔
914
          pQnodeList = NULL;
×
915
        } else {
916
          pQnodeList = taosArrayDup((SArray*)pRes->pRes, NULL);
34,878✔
917
          if (NULL == pQnodeList) {
34,878✔
918
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
919
            goto _return;
×
920
          }
921
        }
922
      } else {
923
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
1,705,108✔
924
        TSC_ERR_JRET(taosThreadMutexLock(&pInst->qnodeMutex));
1,705,108✔
925
        if (pInst->pQnodeList) {
1,705,108✔
926
          pQnodeList = taosArrayDup(pInst->pQnodeList, NULL);
1,705,108✔
927
          if (NULL == pQnodeList) {
1,705,108✔
928
            code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
929
            goto _return;
×
930
          }
931
        }
932
        TSC_ERR_JRET(taosThreadMutexUnlock(&pInst->qnodeMutex));
1,705,108✔
933
      }
934

935
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
1,739,986✔
936
      break;
1,739,986✔
937
    }
938
    default:
600✔
939
      tscError("unknown query policy: %d", tsQueryPolicy);
600✔
940
      return TSDB_CODE_APP_ERROR;
×
941
  }
942

943
_return:
215,454,599✔
944
  taosArrayDestroyEx(pDbVgList, fp);
215,454,599✔
945
  taosArrayDestroy(pQnodeList);
215,456,331✔
946

947
  return code;
215,458,613✔
948
}
949

950
int32_t buildSyncExecNodeList(SRequestObj* pRequest, SArray** pNodeList, SArray* pMnodeList) {
9,373,012✔
951
  SArray* pDbVgList = NULL;
9,373,012✔
952
  SArray* pQnodeList = NULL;
9,373,012✔
953
  int32_t code = 0;
9,374,925✔
954

955
  switch (tsQueryPolicy) {
9,374,925✔
956
    case QUERY_POLICY_VNODE:
9,370,224✔
957
    case QUERY_POLICY_CLIENT: {
958
      int32_t dbNum = taosArrayGetSize(pRequest->dbList);
9,370,224✔
959
      if (dbNum > 0) {
9,379,636✔
960
        SCatalog*     pCtg = NULL;
9,245,848✔
961
        SAppInstInfo* pInst = pRequest->pTscObj->pAppInfo;
9,243,121✔
962
        code = catalogGetHandle(pInst->clusterId, &pCtg);
9,247,968✔
963
        if (code != TSDB_CODE_SUCCESS) {
9,243,906✔
964
          goto _return;
×
965
        }
966

967
        pDbVgList = taosArrayInit(dbNum, POINTER_BYTES);
9,243,906✔
968
        if (NULL == pDbVgList) {
9,247,246✔
969
          code = terrno;
×
970
          goto _return;
×
971
        }
972
        SArray* pVgList = NULL;
9,247,259✔
973
        for (int32_t i = 0; i < dbNum; ++i) {
18,488,986✔
974
          char*            dbFName = taosArrayGet(pRequest->dbList, i);
9,245,505✔
975
          SRequestConnInfo conn = {.pTrans = pInst->pTransporter,
9,246,573✔
976
                                   .requestId = pRequest->requestId,
9,246,753✔
977
                                   .requestObjRefId = pRequest->self,
9,246,395✔
978
                                   .mgmtEps = getEpSet_s(&pInst->mgmtEp)};
9,244,981✔
979

980
          // catalogGetDBVgList will handle dbFName == null.
981
          code = catalogGetDBVgList(pCtg, &conn, dbFName, &pVgList);
9,252,615✔
982
          if (code) {
9,247,521✔
983
            goto _return;
×
984
          }
985

986
          if (NULL == taosArrayPush(pDbVgList, &pVgList)) {
9,253,415✔
987
            code = terrno;
×
988
            goto _return;
×
989
          }
990
        }
991
      }
992

993
      code = buildVnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pDbVgList);
9,384,577✔
994
      break;
9,378,743✔
995
    }
996
    case QUERY_POLICY_HYBRID:
×
997
    case QUERY_POLICY_QNODE: {
998
      TSC_ERR_JRET(getQnodeList(pRequest, &pQnodeList));
×
999

1000
      code = buildQnodePolicyNodeList(pRequest, pNodeList, pMnodeList, pQnodeList);
×
1001
      break;
×
1002
    }
1003
    default:
4,701✔
1004
      tscError("unknown query policy: %d", tsQueryPolicy);
4,701✔
1005
      return TSDB_CODE_APP_ERROR;
×
1006
  }
1007

1008
_return:
9,377,917✔
1009

1010
  taosArrayDestroyEx(pDbVgList, freeVgList);
9,376,651✔
1011
  taosArrayDestroy(pQnodeList);
9,374,151✔
1012

1013
  return code;
9,378,876✔
1014
}
1015

1016
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
9,374,655✔
1017
  void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
9,374,655✔
1018

1019
  SExecResult      res = {0};
9,383,138✔
1020
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
9,381,729✔
1021
                           .requestId = pRequest->requestId,
9,384,265✔
1022
                           .requestObjRefId = pRequest->self};
9,376,467✔
1023
  SSchedulerReq    req = {
9,765,414✔
1024
         .syncReq = true,
1025
         .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
9,374,892✔
1026
         .pConn = &conn,
1027
         .pNodeList = pNodeList,
1028
         .pDag = pDag,
1029
         .sql = pRequest->sqlstr,
9,374,892✔
1030
         .startTs = pRequest->metric.start,
9,377,033✔
1031
         .execFp = NULL,
1032
         .cbParam = NULL,
1033
         .chkKillFp = chkRequestKilled,
1034
         .chkKillParam = (void*)pRequest->self,
9,372,335✔
1035
         .pExecRes = &res,
1036
         .source = pRequest->source,
9,374,648✔
1037
         .secureDelete = pRequest->secureDelete,
9,374,728✔
1038
         .pWorkerCb = getTaskPoolWorkerCb(),
9,381,831✔
1039
  };
1040

1041
  int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
9,376,058✔
1042

1043
  destroyQueryExecRes(&pRequest->body.resInfo.execRes);
9,385,369✔
1044
  (void)memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
9,384,190✔
1045

1046
  if (code != TSDB_CODE_SUCCESS) {
9,383,598✔
1047
    schedulerFreeJob(&pRequest->body.queryJob, 0);
×
1048

1049
    pRequest->code = code;
×
1050
    terrno = code;
×
1051
    return pRequest->code;
×
1052
  }
1053

1054
  if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
9,383,598✔
1055
      TDMT_VND_CREATE_TABLE == pRequest->type) {
102,154✔
1056
    pRequest->body.resInfo.numOfRows = res.numOfRows;
9,344,874✔
1057
    if (TDMT_VND_SUBMIT == pRequest->type) {
9,345,037✔
1058
      STscObj*            pTscObj = pRequest->pTscObj;
9,281,313✔
1059
      SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
9,281,945✔
1060
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, res.numOfRows);
9,284,333✔
1061
    }
1062

1063
    schedulerFreeJob(&pRequest->body.queryJob, 0);
9,345,556✔
1064
  }
1065

1066
  pRequest->code = res.code;
9,385,491✔
1067
  terrno = res.code;
9,386,165✔
1068
  return pRequest->code;
9,382,696✔
1069
}
1070

1071
int32_t handleSubmitExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
728,189,920✔
1072
  SArray*      pArray = NULL;
728,189,920✔
1073
  SSubmitRsp2* pRsp = (SSubmitRsp2*)res;
728,189,920✔
1074
  if (NULL == pRsp->aCreateTbRsp) {
728,189,920✔
1075
    return TSDB_CODE_SUCCESS;
713,784,416✔
1076
  }
1077

1078
  int32_t tbNum = taosArrayGetSize(pRsp->aCreateTbRsp);
14,456,119✔
1079
  for (int32_t i = 0; i < tbNum; ++i) {
32,284,488✔
1080
    SVCreateTbRsp* pTbRsp = (SVCreateTbRsp*)taosArrayGet(pRsp->aCreateTbRsp, i);
17,828,241✔
1081
    if (pTbRsp->pMeta) {
17,827,077✔
1082
      TSC_ERR_RET(handleCreateTbExecRes(pTbRsp->pMeta, pCatalog));
17,484,783✔
1083
    }
1084
  }
1085

1086
  return TSDB_CODE_SUCCESS;
14,456,247✔
1087
}
1088

1089
int32_t handleQueryExecRes(SRequestObj* pRequest, void* res, SCatalog* pCatalog, SEpSet* epset) {
171,675,879✔
1090
  int32_t code = 0;
171,675,879✔
1091
  SArray* pArray = NULL;
171,675,879✔
1092
  SArray* pTbArray = (SArray*)res;
171,675,879✔
1093
  int32_t tbNum = taosArrayGetSize(pTbArray);
171,675,879✔
1094
  if (tbNum <= 0) {
171,675,718✔
1095
    return TSDB_CODE_SUCCESS;
×
1096
  }
1097

1098
  pArray = taosArrayInit(tbNum, sizeof(STbSVersion));
171,675,718✔
1099
  if (NULL == pArray) {
171,676,848✔
1100
    return terrno;
×
1101
  }
1102

1103
  for (int32_t i = 0; i < tbNum; ++i) {
561,829,287✔
1104
    STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
390,150,669✔
1105
    if (NULL == tbInfo) {
390,150,868✔
1106
      code = terrno;
×
1107
      goto _return;
×
1108
    }
1109
    STbSVersion tbSver = {
390,150,868✔
1110
        .tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion, .rver = tbInfo->rversion};
390,150,849✔
1111
    if (NULL == taosArrayPush(pArray, &tbSver)) {
390,152,380✔
1112
      code = terrno;
×
1113
      goto _return;
×
1114
    }
1115
  }
1116

1117
  SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
171,678,618✔
1118
                           .requestId = pRequest->requestId,
171,678,768✔
1119
                           .requestObjRefId = pRequest->self,
171,678,409✔
1120
                           .mgmtEps = *epset};
1121

1122
  code = catalogChkTbMetaVersion(pCatalog, &conn, pArray);
171,678,705✔
1123

1124
_return:
171,676,276✔
1125

1126
  taosArrayDestroy(pArray);
171,676,290✔
1127
  return code;
171,676,977✔
1128
}
1129

1130
int32_t handleAlterTbExecRes(void* res, SCatalog* pCatalog) {
9,442,576✔
1131
  return catalogUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
9,442,576✔
1132
}
1133

1134
int32_t handleCreateTbExecRes(void* res, SCatalog* pCatalog) {
75,704,975✔
1135
  return catalogAsyncUpdateTableMeta(pCatalog, (STableMetaRsp*)res);
75,704,975✔
1136
}
1137

1138
int32_t handleQueryExecRsp(SRequestObj* pRequest) {
1,009,916,425✔
1139
  if (NULL == pRequest->body.resInfo.execRes.res) {
1,009,916,425✔
1140
    return pRequest->code;
57,509,034✔
1141
  }
1142

1143
  SCatalog*     pCatalog = NULL;
952,403,919✔
1144
  SAppInstInfo* pAppInfo = getAppInfo(pRequest);
952,399,566✔
1145

1146
  int32_t code = catalogGetHandle(pAppInfo->clusterId, &pCatalog);
952,435,172✔
1147
  if (code) {
952,414,641✔
1148
    return code;
×
1149
  }
1150

1151
  SEpSet       epset = getEpSet_s(&pAppInfo->mgmtEp);
952,414,641✔
1152
  SExecResult* pRes = &pRequest->body.resInfo.execRes;
952,464,085✔
1153

1154
  switch (pRes->msgType) {
952,467,174✔
1155
    case TDMT_VND_ALTER_TABLE:
4,278,241✔
1156
    case TDMT_MND_ALTER_STB: {
1157
      code = handleAlterTbExecRes(pRes->res, pCatalog);
4,278,241✔
1158
      break;
4,278,241✔
1159
    }
1160
    case TDMT_VND_CREATE_TABLE: {
47,826,711✔
1161
      SArray* pList = (SArray*)pRes->res;
47,826,711✔
1162
      int32_t num = taosArrayGetSize(pList);
47,827,642✔
1163
      for (int32_t i = 0; i < num; ++i) {
103,773,991✔
1164
        void* res = taosArrayGetP(pList, i);
55,945,699✔
1165
        // handleCreateTbExecRes will handle res == null
1166
        code = handleCreateTbExecRes(res, pCatalog);
55,947,193✔
1167
      }
1168
      break;
47,828,292✔
1169
    }
1170
    case TDMT_MND_CREATE_STB: {
434,368✔
1171
      code = handleCreateTbExecRes(pRes->res, pCatalog);
434,368✔
1172
      break;
434,368✔
1173
    }
1174
    case TDMT_VND_SUBMIT: {
728,190,984✔
1175
      (void)atomic_add_fetch_64((int64_t*)&pAppInfo->summary.insertBytes, pRes->numOfBytes);
728,190,984✔
1176

1177
      code = handleSubmitExecRes(pRequest, pRes->res, pCatalog, &epset);
728,248,740✔
1178
      break;
728,228,302✔
1179
    }
1180
    case TDMT_SCH_QUERY:
171,677,005✔
1181
    case TDMT_SCH_MERGE_QUERY: {
1182
      code = handleQueryExecRes(pRequest, pRes->res, pCatalog, &epset);
171,677,005✔
1183
      break;
171,675,020✔
1184
    }
1185
    default:
530✔
1186
      tscError("req:0x%" PRIx64 ", invalid exec result for request type:%d, QID:0x%" PRIx64, pRequest->self,
530✔
1187
               pRequest->type, pRequest->requestId);
1188
      code = TSDB_CODE_APP_ERROR;
×
1189
  }
1190

1191
  return code;
952,444,223✔
1192
}
1193

1194
static bool incompletaFileParsing(SNode* pStmt) {
986,441,439✔
1195
  return QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pStmt) ? false : ((SVnodeModifyOpStmt*)pStmt)->fileProcessing;
986,441,439✔
1196
}
1197

1198
void continuePostSubQuery(SRequestObj* pRequest, SSDataBlock* pBlock) {
×
1199
  SSqlCallbackWrapper* pWrapper = pRequest->pWrapper;
×
1200

1201
  int32_t code = nodesAcquireAllocator(pWrapper->pParseCtx->allocatorId);
×
1202
  if (TSDB_CODE_SUCCESS == code) {
×
1203
    int64_t analyseStart = taosGetTimestampUs();
×
1204
    code = qContinueParsePostQuery(pWrapper->pParseCtx, pRequest->pQuery, pBlock);
×
1205
    pRequest->metric.analyseCostUs += taosGetTimestampUs() - analyseStart;
×
1206
  }
1207

1208
  if (TSDB_CODE_SUCCESS == code) {
×
1209
    code = qContinuePlanPostQuery(pRequest->pPostPlan);
×
1210
  }
1211

1212
  code = nodesReleaseAllocator(pWrapper->pParseCtx->allocatorId);
×
1213
  handleQueryAnslyseRes(pWrapper, NULL, code);
×
1214
}
×
1215

1216
void returnToUser(SRequestObj* pRequest) {
80,668,546✔
1217
  if (pRequest->relation.userRefId == pRequest->self || 0 == pRequest->relation.userRefId) {
80,668,546✔
1218
    // return to client
1219
    doRequestCallback(pRequest, pRequest->code);
80,668,546✔
1220
    return;
80,666,694✔
1221
  }
1222

1223
  SRequestObj* pUserReq = acquireRequest(pRequest->relation.userRefId);
×
1224
  if (pUserReq) {
×
1225
    pUserReq->code = pRequest->code;
×
1226
    // return to client
1227
    doRequestCallback(pUserReq, pUserReq->code);
×
1228
    (void)releaseRequest(pRequest->relation.userRefId);
×
1229
    return;
×
1230
  } else {
1231
    tscError("req:0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1232
             pRequest->relation.userRefId, pRequest->requestId);
1233
  }
1234
}
1235

1236
static int32_t createResultBlock(TAOS_RES* pRes, int32_t numOfRows, SSDataBlock** pBlock) {
×
1237
  int64_t     lastTs = 0;
×
1238
  TAOS_FIELD* pResFields = taos_fetch_fields(pRes);
×
1239
  int32_t     numOfFields = taos_num_fields(pRes);
×
1240

1241
  int32_t code = createDataBlock(pBlock);
×
1242
  if (code) {
×
1243
    return code;
×
1244
  }
1245

1246
  for (int32_t i = 0; i < numOfFields; ++i) {
×
1247
    SColumnInfoData colInfoData = createColumnInfoData(pResFields[i].type, pResFields[i].bytes, i + 1);
×
1248
    code = blockDataAppendColInfo(*pBlock, &colInfoData);
×
1249
    if (TSDB_CODE_SUCCESS != code) {
×
1250
      blockDataDestroy(*pBlock);
×
1251
      return code;
×
1252
    }
1253
  }
1254

1255
  code = blockDataEnsureCapacity(*pBlock, numOfRows);
×
1256
  if (TSDB_CODE_SUCCESS != code) {
×
1257
    blockDataDestroy(*pBlock);
×
1258
    return code;
×
1259
  }
1260

1261
  for (int32_t i = 0; i < numOfRows; ++i) {
×
1262
    TAOS_ROW pRow = taos_fetch_row(pRes);
×
1263
    if (NULL == pRow[0] || NULL == pRow[1] || NULL == pRow[2]) {
×
1264
      tscError("invalid data from vnode");
×
1265
      blockDataDestroy(*pBlock);
×
1266
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
1267
    }
1268
    int64_t ts = *(int64_t*)pRow[0];
×
1269
    if (lastTs < ts) {
×
1270
      lastTs = ts;
×
1271
    }
1272

1273
    for (int32_t j = 0; j < numOfFields; ++j) {
×
1274
      SColumnInfoData* pColInfoData = taosArrayGet((*pBlock)->pDataBlock, j);
×
1275
      code = colDataSetVal(pColInfoData, i, pRow[j], false);
×
1276
      if (TSDB_CODE_SUCCESS != code) {
×
1277
        blockDataDestroy(*pBlock);
×
1278
        return code;
×
1279
      }
1280
    }
1281

1282
    tscInfo("[create stream with histroy] lastKey:%" PRId64 " vgId:%d, vgVer:%" PRId64, ts, *(int32_t*)pRow[1],
×
1283
            *(int64_t*)pRow[2]);
1284
  }
1285

1286
  (*pBlock)->info.window.ekey = lastTs;
×
1287
  (*pBlock)->info.rows = numOfRows;
×
1288

1289
  tscInfo("[create stream with histroy] lastKey:%" PRId64 " numOfRows:%d from all vgroups", lastTs, numOfRows);
×
1290
  return TSDB_CODE_SUCCESS;
×
1291
}
1292

1293
void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
×
1294
  SRequestObj* pRequest = (SRequestObj*)res;
×
1295
  if (pRequest->code) {
×
1296
    returnToUser(pRequest);
×
1297
    return;
×
1298
  }
1299

1300
  SSDataBlock* pBlock = NULL;
×
1301
  pRequest->code = createResultBlock(res, rowNum, &pBlock);
×
1302
  if (TSDB_CODE_SUCCESS != pRequest->code) {
×
1303
    tscError("req:0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
×
1304
             tstrerror(pRequest->code));
1305
    returnToUser(pRequest);
×
1306
    return;
×
1307
  }
1308

1309
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1310
  if (pNextReq) {
×
1311
    continuePostSubQuery(pNextReq, pBlock);
×
1312
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1313
  } else {
1314
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1315
             pRequest->relation.nextRefId, pRequest->requestId);
1316
  }
1317

1318
  blockDataDestroy(pBlock);
×
1319
}
1320

1321
void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
×
1322
  SRequestObj* pRequest = pWrapper->pRequest;
×
1323
  if (TD_RES_QUERY(pRequest)) {
×
1324
    taosAsyncFetchImpl(pRequest, postSubQueryFetchCb, pWrapper);
×
1325
    return;
×
1326
  }
1327

1328
  SRequestObj* pNextReq = acquireRequest(pRequest->relation.nextRefId);
×
1329
  if (pNextReq) {
×
1330
    continuePostSubQuery(pNextReq, NULL);
×
1331
    (void)releaseRequest(pRequest->relation.nextRefId);
×
1332
  } else {
1333
    tscError("req:0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
×
1334
             pRequest->relation.nextRefId, pRequest->requestId);
1335
  }
1336
}
1337

1338
// todo refacto the error code  mgmt
1339
void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
1,000,175,940✔
1340
  SSqlCallbackWrapper* pWrapper = param;
1,000,175,940✔
1341
  SRequestObj*         pRequest = pWrapper->pRequest;
1,000,175,940✔
1342
  STscObj*             pTscObj = pRequest->pTscObj;
1,000,213,199✔
1343

1344
  // Note: This is EXECUTE completion callback, not FETCH callback.
1345
  // Scheduler job phase is authoritative. Client phase is only fallback.
1346
  // Let heartbeat read scheduler job phase via schedulerGetJobPhase().
1347

1348
  pRequest->code = code;
1,000,221,396✔
1349
  if (pResult) {
1,000,222,709✔
1350
    destroyQueryExecRes(&pRequest->body.resInfo.execRes);
1,000,130,970✔
1351
    (void)memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
1,000,141,061✔
1352
  }
1353

1354
  int32_t type = pRequest->type;
1,000,170,181✔
1355
  if (TDMT_VND_SUBMIT == type || TDMT_VND_DELETE == type || TDMT_VND_CREATE_TABLE == type) {
1,000,144,467✔
1356
    if (pResult) {
771,106,159✔
1357
      pRequest->body.resInfo.numOfRows += pResult->numOfRows;
771,084,217✔
1358

1359
      // record the insert rows
1360
      if (TDMT_VND_SUBMIT == type) {
771,123,318✔
1361
        SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
719,578,040✔
1362
        (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertRows, pResult->numOfRows);
719,582,046✔
1363
      }
1364
    }
1365
    schedulerFreeJob(&pRequest->body.queryJob, 0);
771,151,661✔
1366
  }
1367

1368
  taosMemoryFree(pResult);
1,000,216,987✔
1369
  tscDebug("req:0x%" PRIx64 ", enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
1,000,171,198✔
1370
           pRequest->requestId);
1371

1372
  if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL &&
1,000,150,402✔
1373
      pRequest->stmtBindVersion == 0) {
86,176✔
1374
    tscDebug("req:0x%" PRIx64 ", client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64,
85,780✔
1375
             pRequest->self, tstrerror(code), pRequest->retry, pRequest->requestId);
1376
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
85,780✔
1377
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1378
    }
1379
    restartAsyncQuery(pRequest, code);
85,780✔
1380
    return;
85,780✔
1381
  }
1382

1383
  tscTrace("req:0x%" PRIx64 ", scheduler exec cb, request type:%s", pRequest->self, TMSG_INFO(pRequest->type));
1,000,064,622✔
1384
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
1,000,064,622✔
1385
    if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
4,109,975✔
1386
      tscError("req:0x%" PRIx64 ", remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
×
1387
    }
1388
  }
1389

1390
  pRequest->metric.execCostUs = taosGetTimestampUs() - pRequest->metric.execStart;
1,000,092,711✔
1391

1392
  int32_t code1 = handleQueryExecRsp(pRequest);
1,000,128,975✔
1393
  if (pRequest->code == TSDB_CODE_SUCCESS && pRequest->code != code1) {
1,000,112,833✔
1394
    pRequest->code = code1;
×
1395
  }
1396

1397
  if (pRequest->code == TSDB_CODE_SUCCESS && NULL != pRequest->pQuery &&
1,986,585,280✔
1398
      incompletaFileParsing(pRequest->pQuery->pRoot)) {
986,430,813✔
1399
    continueInsertFromCsv(pWrapper, pRequest);
13,576✔
1400
    return;
13,576✔
1401
  }
1402

1403
  if (pRequest->relation.nextRefId) {
1,000,145,132✔
1404
    handlePostSubQuery(pWrapper);
×
1405
  } else {
1406
    destorySqlCallbackWrapper(pWrapper);
1,000,128,937✔
1407
    pRequest->pWrapper = NULL;
1,000,079,249✔
1408

1409
    // return to client
1410
    doRequestCallback(pRequest, code);
1,000,098,319✔
1411
  }
1412
}
1413

1414
void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res) {
9,834,126✔
1415
  int32_t code = 0;
9,834,126✔
1416
  int32_t subplanNum = 0;
9,834,126✔
1417

1418
  if (pQuery->pRoot) {
9,834,126✔
1419
    pRequest->stmtType = pQuery->pRoot->type;
9,382,688✔
1420
    if (nodeType(pQuery->pRoot) == QUERY_NODE_DELETE_STMT) {
9,379,104✔
1421
      pRequest->secureDelete = ((SDeleteStmt*)pQuery->pRoot)->secureDelete;
×
1422
    }
1423
  }
1424

1425
  if (pQuery->pRoot && !pRequest->inRetry) {
9,838,275✔
1426
    STscObj*            pTscObj = pRequest->pTscObj;
9,379,298✔
1427
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
9,382,461✔
1428
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type) {
9,383,906✔
1429
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
9,358,544✔
1430
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
23,038✔
1431
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
23,046✔
1432
    }
1433
  }
1434

1435
  pRequest->body.execMode = pQuery->execMode;
9,846,202✔
1436
  switch (pQuery->execMode) {
9,836,122✔
1437
    case QUERY_EXEC_MODE_LOCAL:
×
1438
      if (!pRequest->validateOnly) {
×
1439
        if (NULL == pQuery->pRoot) {
×
1440
          terrno = TSDB_CODE_INVALID_PARA;
×
1441
          code = terrno;
×
1442
        } else {
1443
          code = execLocalCmd(pRequest, pQuery);
×
1444
        }
1445
      }
1446
      break;
×
1447
    case QUERY_EXEC_MODE_RPC:
460,015✔
1448
      if (!pRequest->validateOnly) {
460,015✔
1449
        code = execDdlQuery(pRequest, pQuery);
460,015✔
1450
      }
1451
      break;
460,015✔
1452
    case QUERY_EXEC_MODE_SCHEDULE: {
9,371,075✔
1453
      SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
9,371,075✔
1454
      if (NULL == pMnodeList) {
9,375,778✔
1455
        code = terrno;
×
1456
        break;
×
1457
      }
1458
      SQueryPlan* pDag = NULL;
9,375,778✔
1459
      code = getPlan(pRequest, pQuery, &pDag, pMnodeList);
9,375,778✔
1460
      if (TSDB_CODE_SUCCESS == code) {
9,378,896✔
1461
        pRequest->body.subplanNum = pDag->numOfSubplans;
9,381,205✔
1462
        if (!pRequest->validateOnly) {
9,378,081✔
1463
          SArray* pNodeList = NULL;
9,375,201✔
1464
          code = buildSyncExecNodeList(pRequest, &pNodeList, pMnodeList);
9,376,726✔
1465

1466
          if (TSDB_CODE_SUCCESS == code) {
9,376,663✔
1467
            code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
9,381,517✔
1468
                                        taosArrayGetSize(pNodeList));
9,379,373✔
1469
          }
1470

1471
          if (TSDB_CODE_SUCCESS == code) {
9,375,171✔
1472
            code = scheduleQuery(pRequest, pDag, pNodeList);
9,375,171✔
1473
          }
1474
          taosArrayDestroy(pNodeList);
9,381,962✔
1475
        }
1476
      }
1477
      taosArrayDestroy(pMnodeList);
9,378,101✔
1478
      break;
9,382,915✔
1479
    }
1480
    case QUERY_EXEC_MODE_EMPTY_RESULT:
×
1481
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
1482
      break;
×
1483
    default:
×
1484
      break;
×
1485
  }
1486

1487
  if (!keepQuery) {
9,844,031✔
1488
    qDestroyQuery(pQuery);
×
1489
  }
1490

1491
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
9,844,031✔
1492
    int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
27,421✔
1493
    if (TSDB_CODE_SUCCESS != ret) {
27,421✔
1494
      tscError("req:0x%" PRIx64 ", remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret,
×
1495
               pRequest->requestId);
1496
    }
1497
  }
1498

1499
  if (TSDB_CODE_SUCCESS == code) {
9,842,757✔
1500
    code = handleQueryExecRsp(pRequest);
9,840,175✔
1501
  }
1502

1503
  if (TSDB_CODE_SUCCESS != code) {
9,845,705✔
1504
    pRequest->code = code;
8,335✔
1505
  }
1506

1507
  if (res) {
9,845,705✔
1508
    *res = pRequest->body.resInfo.execRes.res;
×
1509
    pRequest->body.resInfo.execRes.res = NULL;
×
1510
  }
1511
}
9,845,705✔
1512

1513
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
1,000,573,628✔
1514
                                 SSqlCallbackWrapper* pWrapper) {
1515
  int32_t code = TSDB_CODE_SUCCESS;
1,000,573,628✔
1516
  pRequest->type = pQuery->msgType;
1,000,573,628✔
1517
  SArray*     pMnodeList = NULL;
1,000,625,628✔
1518
  SArray*     pNodeList = NULL;
1,000,625,628✔
1519
  SQueryPlan* pDag = NULL;
1,000,614,623✔
1520
  int64_t     st = taosGetTimestampUs();
1,000,628,478✔
1521

1522
  if (!pRequest->parseOnly) {
1,000,628,478✔
1523
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_PLAN);
2,001,289,043✔
1524

1525
    pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad));
1,000,791,619✔
1526
    if (NULL == pMnodeList) {
1,000,539,325✔
1527
      code = terrno;
×
1528
    }
1529
    SPlanContext cxt = {.queryId = pRequest->requestId,
1,092,879,517✔
1530
                        .acctId = pRequest->pTscObj->acctId,
1,000,635,517✔
1531
                        .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
1,000,686,675✔
1532
                        .pAstRoot = pQuery->pRoot,
1,000,704,311✔
1533
                        .showRewrite = pQuery->showRewrite,
1,000,718,009✔
1534
                        .isView = pWrapper->pParseCtx->isView,
1,000,709,685✔
1535
                        .isAudit = pWrapper->pParseCtx->isAudit,
1,000,686,720✔
1536
                        .privInfo = pWrapper->pParseCtx->privInfo,
1,000,628,052✔
1537
                        .pMsg = pRequest->msgBuf,
1,000,688,891✔
1538
                        .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
1539
                        .pUser = pRequest->pTscObj->user,
1,000,662,531✔
1540
                        .userId = pRequest->pTscObj->userId,
1,000,654,031✔
1541
                        .sysInfo = pRequest->pTscObj->sysInfo,
1,000,584,724✔
1542
                        .timezone = pRequest->pTscObj->optionInfo.timezone,
1,000,663,227✔
1543
                        .allocatorId = pRequest->stmtBindVersion > 0 ? 0 : pRequest->allocatorRefId};
1,000,600,130✔
1544
    if (TSDB_CODE_SUCCESS == code) {
1,000,658,607✔
1545
      code = qCreateQueryPlan(&cxt, &pDag, pMnodeList);
1,000,641,120✔
1546
    }
1547
    if (code) {
1,000,511,604✔
1548
      tscError("req:0x%" PRIx64 " requestId:0x%" PRIx64 ", failed to create query plan, code:%s msg:%s", pRequest->self,
281,457✔
1549
               pRequest->requestId, tstrerror(code), cxt.pMsg);
1550
    } else {
1551
      pRequest->body.subplanNum = pDag->numOfSubplans;
1,000,230,147✔
1552
      TSWAP(pRequest->pPostPlan, pDag->pPostPlan);
1,000,386,780✔
1553
    }
1554
  }
1555

1556
  pRequest->metric.execStart = taosGetTimestampUs();
1,000,608,843✔
1557
  pRequest->metric.planCostUs = pRequest->metric.execStart - st;
1,000,509,263✔
1558

1559
  if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) {
1,000,653,013✔
1560
    if (QUERY_NODE_VNODE_MODIFY_STMT != nodeType(pQuery->pRoot)) {
1,000,063,389✔
1561
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_SCHEDULE);
430,924,094✔
1562
      code = buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta);
215,467,464✔
1563
    }
1564

1565
    if (code == TSDB_CODE_SUCCESS) {
1,000,159,592✔
1566
      code = sessMetricCheckValue((SSessMetric*)pRequest->pTscObj->pSessMetric, SESSION_MAX_CALL_VNODE_NUM,
1,000,139,693✔
1567
                                  taosArrayGetSize(pNodeList));
1,000,106,214✔
1568
    }
1569

1570
    if (code == TSDB_CODE_SUCCESS) {
1,000,143,900✔
1571
      SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter,
1,000,140,120✔
1572
                               .requestId = pRequest->requestId,
1,000,148,760✔
1573
                               .requestObjRefId = pRequest->self};
1,000,113,562✔
1574
      SSchedulerReq    req = {
1,046,316,413✔
1575
             .syncReq = false,
1576
             .localReq = (tsQueryPolicy == QUERY_POLICY_CLIENT),
1,000,125,925✔
1577
             .pConn = &conn,
1578
             .pNodeList = pNodeList,
1579
             .pDag = pDag,
1580
             .allocatorRefId = pRequest->allocatorRefId,
1,000,125,925✔
1581
             .sql = pRequest->sqlstr,
1,000,067,777✔
1582
             .startTs = pRequest->metric.start,
1,000,074,263✔
1583
             .execFp = schedulerExecCb,
1584
             .cbParam = pWrapper,
1585
             .chkKillFp = chkRequestKilled,
1586
             .chkKillParam = (void*)pRequest->self,
1,000,116,743✔
1587
             .pExecRes = NULL,
1588
             .source = pRequest->source,
1,000,085,796✔
1589
             .secureDelete = pRequest->secureDelete,
1,000,121,682✔
1590
             .pWorkerCb = getTaskPoolWorkerCb(),
1,000,097,824✔
1591
      };
1592

1593
      if (TSDB_CODE_SUCCESS == code) {
1,000,010,694✔
1594
        CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_EXECUTE);
2,000,361,901✔
1595
        code = schedulerExecJob(&req, &pRequest->body.queryJob);
1,000,156,726✔
1596
      }
1597
      taosArrayDestroy(pNodeList);
1,000,004,431✔
1598
      taosArrayDestroy(pMnodeList);
1,000,164,090✔
1599
      return code;
1,000,183,635✔
1600
    }
1601
  }
1602

1603
  qDestroyQueryPlan(pDag);
632,224✔
1604
  tscDebug("req:0x%" PRIx64 ", plan not executed, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code),
496,945✔
1605
           pRequest->requestId);
1606
  destorySqlCallbackWrapper(pWrapper);
496,945✔
1607
  pRequest->pWrapper = NULL;
496,945✔
1608
  if (TSDB_CODE_SUCCESS != code) {
496,945✔
1609
    pRequest->code = code;
285,237✔
1610
  }
1611

1612
  doRequestCallback(pRequest, code);
496,945✔
1613

1614
  // todo not to be released here
1615
  taosArrayDestroy(pMnodeList);
496,945✔
1616
  taosArrayDestroy(pNodeList);
496,945✔
1617

1618
  return code;
493,943✔
1619
}
1620

1621
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper) {
1,119,423,657✔
1622
  int32_t code = 0;
1,119,423,657✔
1623

1624
  if (pRequest->parseOnly) {
1,119,423,657✔
1625
    doRequestCallback(pRequest, 0);
328,217✔
1626
    return;
328,217✔
1627
  }
1628

1629
  pRequest->body.execMode = pQuery->execMode;
1,119,175,969✔
1630
  if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
1,119,188,091✔
1631
    destorySqlCallbackWrapper(pWrapper);
118,563,054✔
1632
    pRequest->pWrapper = NULL;
118,564,610✔
1633
  }
1634

1635
  if (pQuery->pRoot && !pRequest->inRetry) {
1,119,159,367✔
1636
    STscObj*            pTscObj = pRequest->pTscObj;
1,119,134,541✔
1637
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
1,119,231,152✔
1638
    if (QUERY_NODE_VNODE_MODIFY_STMT == pQuery->pRoot->type &&
1,119,221,017✔
1639
        (0 == ((SVnodeModifyOpStmt*)pQuery->pRoot)->sqlNodeType)) {
784,735,073✔
1640
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfInsertsReq, 1);
719,087,684✔
1641
    } else if (QUERY_NODE_SELECT_STMT == pQuery->pRoot->type) {
400,160,681✔
1642
      (void)atomic_add_fetch_64((int64_t*)&pActivity->numOfQueryReq, 1);
162,221,923✔
1643
    }
1644
  }
1645

1646
  switch (pQuery->execMode) {
1,119,288,887✔
1647
    case QUERY_EXEC_MODE_LOCAL:
6,853,327✔
1648
      asyncExecLocalCmd(pRequest, pQuery);
6,853,327✔
1649
      break;
6,853,344✔
1650
    case QUERY_EXEC_MODE_RPC:
111,048,393✔
1651
      code = asyncExecDdlQuery(pRequest, pQuery);
111,048,393✔
1652
      break;
111,044,131✔
1653
    case QUERY_EXEC_MODE_SCHEDULE: {
1,000,660,072✔
1654
      code = asyncExecSchQuery(pRequest, pQuery, pResultMeta, pWrapper);
1,000,660,072✔
1655
      break;
1,000,635,679✔
1656
    }
1657
    case QUERY_EXEC_MODE_EMPTY_RESULT:
657,860✔
1658
      pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
657,860✔
1659
      doRequestCallback(pRequest, 0);
657,860✔
1660
      break;
657,860✔
1661
    default:
×
1662
      tscError("req:0x%" PRIx64 ", invalid execMode %d", pRequest->self, pQuery->execMode);
×
1663
      doRequestCallback(pRequest, -1);
×
1664
      break;
×
1665
  }
1666
}
1667

1668
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
14,776✔
1669
  SCatalog* pCatalog = NULL;
14,776✔
1670
  int32_t   code = 0;
14,776✔
1671
  int32_t   dbNum = taosArrayGetSize(pRequest->dbList);
14,776✔
1672
  int32_t   tblNum = taosArrayGetSize(pRequest->tableList);
14,776✔
1673

1674
  if (dbNum <= 0 && tblNum <= 0) {
14,776✔
1675
    return TSDB_CODE_APP_ERROR;
13,984✔
1676
  }
1677

1678
  code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
792✔
1679
  if (code != TSDB_CODE_SUCCESS) {
792✔
1680
    return code;
×
1681
  }
1682

1683
  SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
792✔
1684
                           .requestId = pRequest->requestId,
792✔
1685
                           .requestObjRefId = pRequest->self,
792✔
1686
                           .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
792✔
1687

1688
  for (int32_t i = 0; i < dbNum; ++i) {
1,584✔
1689
    char* dbFName = taosArrayGet(pRequest->dbList, i);
792✔
1690

1691
    // catalogRefreshDBVgInfo will handle dbFName == null.
1692
    code = catalogRefreshDBVgInfo(pCatalog, &conn, dbFName);
792✔
1693
    if (code != TSDB_CODE_SUCCESS) {
792✔
1694
      return code;
×
1695
    }
1696
  }
1697

1698
  for (int32_t i = 0; i < tblNum; ++i) {
2,178✔
1699
    SName* tableName = taosArrayGet(pRequest->tableList, i);
1,584✔
1700

1701
    // catalogRefreshTableMeta will handle tableName == null.
1702
    code = catalogRefreshTableMeta(pCatalog, &conn, tableName, -1);
1,584✔
1703
    if (code != TSDB_CODE_SUCCESS) {
1,584✔
1704
      return code;
198✔
1705
    }
1706
  }
1707

1708
  return code;
594✔
1709
}
1710

1711
int32_t removeMeta(STscObj* pTscObj, SArray* tbList, bool isView) {
5,713,357✔
1712
  SCatalog* pCatalog = NULL;
5,713,357✔
1713
  int32_t   tbNum = taosArrayGetSize(tbList);
5,713,357✔
1714
  int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
5,713,360✔
1715
  if (code != TSDB_CODE_SUCCESS) {
5,713,360✔
1716
    return code;
×
1717
  }
1718

1719
  if (isView) {
5,713,360✔
1720
    for (int32_t i = 0; i < tbNum; ++i) {
777,980✔
1721
      SName* pViewName = taosArrayGet(tbList, i);
388,990✔
1722
      char   dbFName[TSDB_DB_FNAME_LEN];
385,451✔
1723
      if (NULL == pViewName) {
388,990✔
1724
        continue;
×
1725
      }
1726
      (void)tNameGetFullDbName(pViewName, dbFName);
388,990✔
1727
      TSC_ERR_RET(catalogRemoveViewMeta(pCatalog, dbFName, 0, pViewName->tname, 0));
388,990✔
1728
    }
1729
  } else {
1730
    for (int32_t i = 0; i < tbNum; ++i) {
8,468,014✔
1731
      SName* pTbName = taosArrayGet(tbList, i);
3,143,644✔
1732
      TSC_ERR_RET(catalogRemoveTableMeta(pCatalog, pTbName));
3,143,644✔
1733
    }
1734
  }
1735

1736
  return TSDB_CODE_SUCCESS;
5,713,360✔
1737
}
1738

1739
int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
94,655,338✔
1740
  pEpSet->version = 0;
94,655,338✔
1741

1742
  // init mnode ip set
1743
  SEpSet* mgmtEpSet = &(pEpSet->epSet);
94,655,495✔
1744
  mgmtEpSet->numOfEps = 0;
94,655,698✔
1745
  mgmtEpSet->inUse = 0;
94,655,473✔
1746

1747
  if (firstEp && firstEp[0] != 0) {
94,655,613✔
1748
    if (strlen(firstEp) >= TSDB_EP_LEN) {
94,657,166✔
1749
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1750
      return -1;
×
1751
    }
1752

1753
    int32_t code = taosGetFqdnPortFromEp(firstEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
94,657,166✔
1754
    if (code != TSDB_CODE_SUCCESS) {
94,657,007✔
1755
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1756
      return terrno;
×
1757
    }
1758
    // uint32_t addr = 0;
1759
    SIpAddr addr = {0};
94,657,007✔
1760
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
94,657,007✔
1761
    if (code) {
94,655,387✔
1762
      tscError("failed to resolve firstEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
159✔
1763
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1764
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
173✔
1765
    } else {
1766
      mgmtEpSet->numOfEps++;
94,657,111✔
1767
    }
1768
  }
1769

1770
  if (secondEp && secondEp[0] != 0) {
94,655,797✔
1771
    if (strlen(secondEp) >= TSDB_EP_LEN) {
2,121,916✔
1772
      terrno = TSDB_CODE_TSC_INVALID_FQDN;
×
1773
      return terrno;
×
1774
    }
1775

1776
    int32_t code = taosGetFqdnPortFromEp(secondEp, &mgmtEpSet->eps[mgmtEpSet->numOfEps]);
2,121,916✔
1777
    if (code != TSDB_CODE_SUCCESS) {
2,121,916✔
1778
      return code;
×
1779
    }
1780
    SIpAddr addr = {0};
2,121,916✔
1781
    code = taosGetIpFromFqdn(tsEnableIpv6, mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn, &addr);
2,121,916✔
1782
    if (code) {
2,121,860✔
1783
      tscError("failed to resolve secondEp fqdn: %s, code:%s", mgmtEpSet->eps[mgmtEpSet->numOfEps].fqdn,
×
1784
               tstrerror(TSDB_CODE_TSC_INVALID_FQDN));
1785
      (void)memset(&(mgmtEpSet->eps[mgmtEpSet->numOfEps]), 0, sizeof(mgmtEpSet->eps[mgmtEpSet->numOfEps]));
×
1786
    } else {
1787
      mgmtEpSet->numOfEps++;
2,121,860✔
1788
    }
1789
  }
1790

1791
  if (mgmtEpSet->numOfEps == 0) {
94,655,458✔
1792
    terrno = TSDB_CODE_RPC_NETWORK_UNAVAIL;
173✔
1793
    return TSDB_CODE_RPC_NETWORK_UNAVAIL;
173✔
1794
  }
1795

1796
  return 0;
94,655,568✔
1797
}
1798

1799
int32_t taosConnectImpl(const char* user, const char* auth, int32_t totpCode, const char* db, __taos_async_fn_t fp,
94,657,359✔
1800
                        void* param, SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj) {
1801
  *pTscObj = NULL;
94,657,359✔
1802
  int32_t code = createTscObj(user, auth, db, connType, pAppInfo, pTscObj);
94,657,359✔
1803
  if (TSDB_CODE_SUCCESS != code) {
94,658,072✔
1804
    return code;
×
1805
  }
1806

1807
  SRequestObj* pRequest = NULL;
94,658,072✔
1808
  code = createRequest((*pTscObj)->id, TDMT_MND_CONNECT, 0, &pRequest);
94,658,072✔
1809
  if (TSDB_CODE_SUCCESS != code) {
94,657,337✔
1810
    destroyTscObj(*pTscObj);
×
1811
    return code;
×
1812
  }
1813

1814
  pRequest->sqlstr = taosStrdup("taos_connect");
94,657,337✔
1815
  if (pRequest->sqlstr) {
94,656,937✔
1816
    pRequest->sqlLen = strlen(pRequest->sqlstr);
94,656,712✔
1817
  } else {
1818
    return terrno;
×
1819
  }
1820

1821
  SMsgSendInfo* body = NULL;
94,656,937✔
1822
  code = buildConnectMsg(pRequest, &body, totpCode);
94,656,937✔
1823
  if (TSDB_CODE_SUCCESS != code) {
94,655,986✔
1824
    destroyTscObj(*pTscObj);
×
1825
    return code;
×
1826
  }
1827

1828
  // int64_t transporterId = 0;
1829
  SEpSet epset = getEpSet_s(&(*pTscObj)->pAppInfo->mgmtEp);
94,655,986✔
1830
  code = asyncSendMsgToServer((*pTscObj)->pAppInfo->pTransporter, &epset, NULL, body);
94,657,409✔
1831
  if (TSDB_CODE_SUCCESS != code) {
94,655,027✔
1832
    destroyTscObj(*pTscObj);
×
1833
    tscError("failed to send connect msg to server, code:%s", tstrerror(code));
×
1834
    return code;
×
1835
  }
1836
  if (TSDB_CODE_SUCCESS != tsem_wait(&pRequest->body.rspSem)) {
94,655,027✔
1837
    destroyTscObj(*pTscObj);
×
1838
    tscError("failed to wait sem, code:%s", terrstr());
×
1839
    return terrno;
×
1840
  }
1841
  if (pRequest->code != TSDB_CODE_SUCCESS) {
94,656,567✔
1842
    const char* errorMsg = (code == TSDB_CODE_RPC_FQDN_ERROR) ? taos_errstr(pRequest) : tstrerror(pRequest->code);
19,627✔
1843
    tscError("failed to connect to server, reason: %s", errorMsg);
19,627✔
1844

1845
    terrno = pRequest->code;
19,627✔
1846
    destroyRequest(pRequest);
19,627✔
1847
    taos_close_internal(*pTscObj);
19,627✔
1848
    *pTscObj = NULL;
19,627✔
1849
    return terrno;
19,627✔
1850
  }
1851
  if (connType == CONN_TYPE__AUTH_TEST) {
94,636,940✔
1852
    terrno = TSDB_CODE_SUCCESS;
×
1853
    destroyRequest(pRequest);
×
1854
    taos_close_internal(*pTscObj);
×
1855
    *pTscObj = NULL;
954✔
1856
    return TSDB_CODE_SUCCESS;
954✔
1857
  }
1858

1859
  tscInfo("conn:0x%" PRIx64 ", connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
94,636,940✔
1860
          (*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
1861
  destroyRequest(pRequest);
94,639,355✔
1862
  return code;
94,635,685✔
1863
}
1864

1865
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo, int32_t totpCode) {
94,656,500✔
1866
  *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
94,656,500✔
1867
  if (*pMsgSendInfo == NULL) {
94,657,168✔
1868
    return terrno;
×
1869
  }
1870

1871
  (*pMsgSendInfo)->msgType = TDMT_MND_CONNECT;
94,657,168✔
1872

1873
  (*pMsgSendInfo)->requestObjRefId = pRequest->self;
94,657,168✔
1874
  (*pMsgSendInfo)->requestId = pRequest->requestId;
94,657,168✔
1875
  (*pMsgSendInfo)->fp = getMsgRspHandle((*pMsgSendInfo)->msgType);
94,657,168✔
1876
  (*pMsgSendInfo)->param = taosMemoryCalloc(1, sizeof(pRequest->self));
94,657,389✔
1877
  if (NULL == (*pMsgSendInfo)->param) {
94,657,990✔
1878
    taosMemoryFree(*pMsgSendInfo);
×
1879
    return terrno;
×
1880
  }
1881

1882
  *(int64_t*)(*pMsgSendInfo)->param = pRequest->self;
94,657,765✔
1883

1884
  SConnectReq connectReq = {0};
94,657,765✔
1885
  STscObj*    pObj = pRequest->pTscObj;
94,657,765✔
1886

1887
  char* db = getDbOfConnection(pObj);
94,657,765✔
1888
  if (db != NULL) {
94,656,844✔
1889
    tstrncpy(connectReq.db, db, sizeof(connectReq.db));
475,833✔
1890
  } else if (terrno) {
94,181,011✔
1891
    taosMemoryFree(*pMsgSendInfo);
×
1892
    return terrno;
×
1893
  }
1894
  taosMemoryFreeClear(db);
94,657,670✔
1895

1896
  connectReq.connType = pObj->connType;
94,657,893✔
1897
  connectReq.pid = appInfo.pid;
94,657,893✔
1898
  connectReq.startTime = appInfo.startTime;
94,657,668✔
1899
  connectReq.totpCode = totpCode;
94,657,668✔
1900
  connectReq.connectTime = taosGetTimestampMs();
94,656,031✔
1901

1902
  tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
94,656,031✔
1903
  tstrncpy(connectReq.user, pObj->user, sizeof(connectReq.user));
94,656,031✔
1904
  tstrncpy(connectReq.passwd, pObj->pass, sizeof(connectReq.passwd));
94,656,031✔
1905
  tstrncpy(connectReq.token, pObj->token, sizeof(connectReq.token));
94,656,031✔
1906
  tstrncpy(connectReq.sVer, td_version, sizeof(connectReq.sVer));
94,656,031✔
1907
  tSignConnectReq(&connectReq);
94,656,256✔
1908

1909
  int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
94,657,548✔
1910
  void*   pReq = taosMemoryMalloc(contLen);
94,655,731✔
1911
  if (NULL == pReq) {
94,657,337✔
1912
    taosMemoryFree(*pMsgSendInfo);
×
1913
    return terrno;
×
1914
  }
1915

1916
  if (-1 == tSerializeSConnectReq(pReq, contLen, &connectReq)) {
94,657,337✔
1917
    taosMemoryFree(*pMsgSendInfo);
1,360✔
1918
    taosMemoryFree(pReq);
×
1919
    return terrno;
×
1920
  }
1921

1922
  (*pMsgSendInfo)->msgInfo.len = contLen;
94,654,181✔
1923
  (*pMsgSendInfo)->msgInfo.pData = pReq;
94,654,399✔
1924
  return TSDB_CODE_SUCCESS;
94,654,379✔
1925
}
1926

1927
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1928
  if (NULL == pEpSet) {
2,147,483,647✔
1929
    return;
2,147,483,647✔
1930
  }
1931

1932
  switch (pSendInfo->target.type) {
5,096,375✔
1933
    case TARGET_TYPE_MNODE:
224✔
1934
      if (NULL == pTscObj) {
224✔
1935
        tscError("mnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
×
1936
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1937
        return;
988✔
1938
      }
1939

1940
      SEpSet  originEpset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
224✔
1941
      SEpSet* pOrig = &originEpset;
224✔
1942
      SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
224✔
1943
      SEp*    pNewEp = &pEpSet->eps[pEpSet->inUse];
224✔
1944
      tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in client", pOrig->inUse, pOrig->numOfEps,
224✔
1945
               pOrigEp->fqdn, pOrigEp->port, pEpSet->inUse, pEpSet->numOfEps, pNewEp->fqdn, pNewEp->port);
1946
      updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
224✔
1947
      break;
507,291✔
1948
    case TARGET_TYPE_VNODE: {
4,842,990✔
1949
      if (NULL == pTscObj) {
4,843,030✔
1950
        tscError("vnode epset changed but not able to update it, msg:%s, reqObjRefId:%" PRIx64,
988✔
1951
                 TMSG_INFO(pMsg->msgType), pSendInfo->requestObjRefId);
1952
        return;
988✔
1953
      }
1954

1955
      SCatalog* pCatalog = NULL;
4,842,042✔
1956
      int32_t   code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
4,842,042✔
1957
      if (code != TSDB_CODE_SUCCESS) {
4,842,016✔
1958
        tscError("fail to get catalog handle, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
×
1959
                 tstrerror(code));
1960
        return;
×
1961
      }
1962

1963
      code = catalogUpdateVgEpSet(pCatalog, pSendInfo->target.dbFName, pSendInfo->target.vgId, pEpSet);
4,842,016✔
1964
      if (code != TSDB_CODE_SUCCESS) {
4,842,105✔
1965
        tscError("fail to update catalog vg epset, clusterId:0x%" PRIx64 ", error:%s", pTscObj->pAppInfo->clusterId,
12✔
1966
                 tstrerror(code));
1967
        return;
×
1968
      }
1969
      taosMemoryFreeClear(pSendInfo->target.dbFName);
4,842,093✔
1970
      break;
4,842,153✔
1971
    }
1972
    default:
254,480✔
1973
      tscDebug("epset changed, not updated, msgType %s", TMSG_INFO(pMsg->msgType));
254,480✔
1974
      break;
255,088✔
1975
  }
1976
}
1977

1978
int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
1979
  SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
2,147,483,647✔
1980
  if (pMsg->info.ahandle == NULL) {
2,147,483,647✔
1981
    tscError("doProcessMsgFromServer pMsg->info.ahandle == NULL");
544,744✔
1982
    rpcFreeCont(pMsg->pCont);
544,744✔
1983
    taosMemoryFree(pEpSet);
544,744✔
1984
    return TSDB_CODE_TSC_INTERNAL_ERROR;
544,744✔
1985
  }
1986

1987
  STscObj* pTscObj = NULL;
2,147,483,647✔
1988

1989
  STraceId* trace = &pMsg->info.traceId;
2,147,483,647✔
1990
  char      tbuf[40] = {0};
2,147,483,647✔
1991
  TRACE_TO_STR(trace, tbuf);
2,147,483,647✔
1992

1993
  tscDebug("QID:%s, process message from server, handle:%p, message:%s, size:%d, code:%s", tbuf, pMsg->info.handle,
2,147,483,647✔
1994
           TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code));
1995

1996
  if (pSendInfo->requestObjRefId != 0) {
2,147,483,647✔
1997
    SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
2,049,811,833✔
1998
    if (pRequest) {
2,049,804,444✔
1999
      if (pRequest->self != pSendInfo->requestObjRefId) {
2,034,246,809✔
2000
        tscError("doProcessMsgFromServer req:0x%" PRId64 " != pSendInfo->requestObjRefId:0x%" PRId64, pRequest->self,
×
2001
                 pSendInfo->requestObjRefId);
2002

2003
        if (TSDB_CODE_SUCCESS != taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId)) {
×
2004
          tscError("doProcessMsgFromServer taosReleaseRef failed");
×
2005
        }
2006
        rpcFreeCont(pMsg->pCont);
×
2007
        taosMemoryFree(pEpSet);
×
2008
        destroySendMsgInfo(pSendInfo);
×
2009
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2010
      }
2011
      pTscObj = pRequest->pTscObj;
2,034,253,324✔
2012
    }
2013
  }
2014

2015
  updateTargetEpSet(pSendInfo, pTscObj, pMsg, pEpSet);
2,147,483,647✔
2016

2017
  SDataBuf buf = {.msgType = pMsg->msgType,
2,147,483,647✔
2018
                  .len = pMsg->contLen,
2,147,483,647✔
2019
                  .pData = NULL,
2020
                  .handle = pMsg->info.handle,
2,147,483,647✔
2021
                  .handleRefId = pMsg->info.refId,
2,147,483,647✔
2022
                  .pEpSet = pEpSet};
2023

2024
  if (pMsg->contLen > 0) {
2,147,483,647✔
2025
    buf.pData = taosMemoryCalloc(1, pMsg->contLen);
2,147,483,647✔
2026
    if (buf.pData == NULL) {
2,147,483,647✔
2027
      pMsg->code = terrno;
×
2028
    } else {
2029
      (void)memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
2,147,483,647✔
2030
    }
2031
  }
2032

2033
  (void)pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
2,147,483,647✔
2034

2035
  if (pTscObj) {
2,147,483,647✔
2036
    int32_t code = taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
2,034,185,220✔
2037
    if (TSDB_CODE_SUCCESS != code) {
2,034,265,375✔
2038
      tscError("doProcessMsgFromServer taosReleaseRef failed");
351✔
2039
      terrno = code;
351✔
2040
      pMsg->code = code;
351✔
2041
    }
2042
  }
2043

2044
  rpcFreeCont(pMsg->pCont);
2,147,483,647✔
2045
  destroySendMsgInfo(pSendInfo);
2,147,483,647✔
2046
  return TSDB_CODE_SUCCESS;
2,147,483,647✔
2047
}
2048

2049
int32_t doProcessMsgFromServer(void* param) {
2,147,483,647✔
2050
  AsyncArg* arg = (AsyncArg*)param;
2,147,483,647✔
2051
  int32_t   code = doProcessMsgFromServerImpl(&arg->msg, arg->pEpset);
2,147,483,647✔
2052
  taosMemoryFree(arg);
2,147,483,647✔
2053
  return code;
2,147,483,647✔
2054
}
2055

2056
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
2,147,483,647✔
2057
  int32_t code = 0;
2,147,483,647✔
2058
  SEpSet* tEpSet = NULL;
2,147,483,647✔
2059

2060
  tscDebug("msg callback, ahandle %p", pMsg->info.ahandle);
2,147,483,647✔
2061

2062
  if (pEpSet != NULL) {
2,147,483,647✔
2063
    tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
5,098,437✔
2064
    if (NULL == tEpSet) {
5,097,717✔
2065
      code = terrno;
×
2066
      pMsg->code = terrno;
×
2067
      goto _exit;
×
2068
    }
2069
    (void)memcpy((void*)tEpSet, (void*)pEpSet, sizeof(SEpSet));
5,097,717✔
2070
  }
2071

2072
  // pMsg is response msg
2073
  if (pMsg->msgType == TDMT_MND_CONNECT + 1) {
2,147,483,647✔
2074
    // restore origin code
2075
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED) {
94,600,463✔
2076
      pMsg->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
×
2077
    } else if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
94,600,508✔
2078
      pMsg->code = TSDB_CODE_RPC_BROKEN_LINK;
×
2079
    }
2080
  } else {
2081
    // uniform to one error code: TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED
2082
    if (pMsg->code == TSDB_CODE_RPC_SOMENODE_BROKEN_LINK) {
2,147,483,647✔
2083
      pMsg->code = TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED;
×
2084
    }
2085
  }
2086

2087
  AsyncArg* arg = taosMemoryCalloc(1, sizeof(AsyncArg));
2,147,483,647✔
2088
  if (NULL == arg) {
2,147,483,647✔
2089
    code = terrno;
×
2090
    pMsg->code = code;
×
2091
    goto _exit;
×
2092
  }
2093

2094
  arg->msg = *pMsg;
2,147,483,647✔
2095
  arg->pEpset = tEpSet;
2,147,483,647✔
2096

2097
  if ((code = taosAsyncExec(doProcessMsgFromServer, arg, NULL)) != 0) {
2,147,483,647✔
2098
    pMsg->code = code;
447,972✔
2099
    taosMemoryFree(arg);
447,972✔
2100
    goto _exit;
382,394✔
2101
  }
2102
  return;
2,147,483,647✔
2103

2104
_exit:
382,394✔
2105
  tscError("failed to sched msg to tsc since %s", tstrerror(code));
382,394✔
2106
  code = doProcessMsgFromServerImpl(pMsg, tEpSet);
382,394✔
2107
  if (code != 0) {
382,394✔
2108
    tscError("failed to sched msg to tsc, tsc ready quit");
×
2109
  }
2110
}
2111

2112
TAOS* taos_connect_totp(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
2,458✔
2113
                        uint16_t port) {
2114
  tscInfo("try to connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
2,458✔
2115
  if (user == NULL) {
2,458✔
2116
    user = TSDB_DEFAULT_USER;
×
2117
  }
2118

2119
  if (pass == NULL) {
2,458✔
2120
    pass = TSDB_DEFAULT_PASS;
×
2121
  }
2122

2123
  STscObj* pObj = NULL;
2,458✔
2124
  int32_t  code = taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__QUERY, &pObj);
2,458✔
2125
  if (TSDB_CODE_SUCCESS == code) {
2,458✔
2126
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,663✔
2127
    if (NULL == rid) {
1,663✔
2128
      tscError("out of memory when taos_connect_totp to %s:%u, user:%s db:%s", ip, port, user, db);
×
2129
      return NULL;
×
2130
    }
2131
    *rid = pObj->id;
1,663✔
2132
    return (TAOS*)rid;
1,663✔
2133
  } else {
2134
    terrno = code;
795✔
2135
  }
2136

2137
  return NULL;
795✔
2138
}
2139

2140
int taos_connect_test(const char* ip, const char* user, const char* pass, const char* totp, const char* db,
×
2141
                      uint16_t port) {
2142
  tscInfo("try to test connect to %s:%u by totp, user:%s db:%s", ip, port, user, db);
×
2143
  if (user == NULL) {
×
2144
    user = TSDB_DEFAULT_USER;
×
2145
  }
2146

2147
  if (pass == NULL) {
×
2148
    pass = TSDB_DEFAULT_PASS;
×
2149
  }
2150

2151
  STscObj* pObj = NULL;
×
2152
  return taos_connect_internal(ip, user, pass, totp, db, port, CONN_TYPE__AUTH_TEST, &pObj);
×
2153
}
2154

2155
TAOS* taos_connect_token(const char* ip, const char* token, const char* db, uint16_t port) {
2,880✔
2156
  tscInfo("try to connect to %s:%u by token, db:%s", ip, port, db);
2,880✔
2157

2158
  STscObj* pObj = NULL;
2,880✔
2159
  int32_t  code = taos_connect_by_auth(ip, NULL, token, NULL, db, port, CONN_TYPE__QUERY, &pObj);
2,880✔
2160
  if (TSDB_CODE_SUCCESS == code) {
2,880✔
2161
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
1,340✔
2162
    if (NULL == rid) {
1,340✔
2163
      tscError("out of memory when taos_connect_token to %s:%u db:%s", ip, port, db);
×
2164
      return NULL;
×
2165
    }
2166
    *rid = pObj->id;
1,340✔
2167
    return (TAOS*)rid;
1,340✔
2168
  } else {
2169
    terrno = code;
1,540✔
2170
  }
2171

2172
  return NULL;
1,540✔
2173
}
2174

2175
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
142✔
2176
  tscInfo("try to connect to %s:%u by auth, user:%s db:%s", ip, port, user, db);
142✔
2177
  if (user == NULL) {
142✔
2178
    user = TSDB_DEFAULT_USER;
×
2179
  }
2180

2181
  if (auth == NULL) {
142✔
2182
    tscError("No auth info is given, failed to connect to server");
×
2183
    return NULL;
×
2184
  }
2185

2186
  STscObj* pObj = NULL;
142✔
2187
  int32_t  code = taos_connect_by_auth(ip, user, auth, NULL, db, port, CONN_TYPE__QUERY, &pObj);
142✔
2188
  if (TSDB_CODE_SUCCESS == code) {
142✔
2189
    int64_t* rid = taosMemoryCalloc(1, sizeof(int64_t));
×
2190
    if (NULL == rid) {
×
2191
      tscError("out of memory when taos connect to %s:%u, user:%s db:%s", ip, port, user, db);
×
2192
    }
2193
    *rid = pObj->id;
×
2194
    return (TAOS*)rid;
×
2195
  }
2196

2197
  return NULL;
142✔
2198
}
2199

2200
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
2,147,483,647✔
2201
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,147,483,647✔
2202
    SResultColumn* pCol = &pResultInfo->pCol[i];
2,147,483,647✔
2203

2204
    int32_t type = pResultInfo->fields[i].type;
2,147,483,647✔
2205
    int32_t schemaBytes = calcSchemaBytesFromTypeBytes(type, pResultInfo->userFields[i].bytes, false);
2,147,483,647✔
2206

2207
    if (IS_VAR_DATA_TYPE(type)) {
2,147,483,647✔
2208
      if (!IS_VAR_NULL_TYPE(type, schemaBytes) && pCol->offset[pResultInfo->current] != -1) {
2,147,483,647✔
2209
        char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
2,147,483,647✔
2210

2211
        if (IS_STR_DATA_BLOB(type)) {
2,147,483,647✔
2212
          pResultInfo->length[i] = blobDataLen(pStart);
25,880✔
2213
          pResultInfo->row[i] = blobDataVal(pStart);
396✔
2214
        } else {
2215
          pResultInfo->length[i] = varDataLen(pStart);
2,147,483,647✔
2216
          pResultInfo->row[i] = varDataVal(pStart);
2,147,483,647✔
2217
        }
2218
      } else {
2219
        pResultInfo->row[i] = NULL;
297,497,807✔
2220
        pResultInfo->length[i] = 0;
297,545,002✔
2221
      }
2222
    } else {
2223
      if (!colDataIsNull_f(pCol, pResultInfo->current)) {
2,147,483,647✔
2224
        pResultInfo->row[i] = pResultInfo->pCol[i].pData + schemaBytes * pResultInfo->current;
2,147,483,647✔
2225
        pResultInfo->length[i] = schemaBytes;
2,147,483,647✔
2226
      } else {
2227
        pResultInfo->row[i] = NULL;
1,283,080,913✔
2228
        pResultInfo->length[i] = 0;
1,283,386,318✔
2229
      }
2230
    }
2231
  }
2232
}
2,147,483,647✔
2233

2234
void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
×
2235
  if (pRequest == NULL) {
×
2236
    return NULL;
×
2237
  }
2238

2239
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
×
2240
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
×
2241
    // All data has returned to App already, no need to try again
2242
    if (pResultInfo->completed) {
×
2243
      pResultInfo->numOfRows = 0;
×
2244
      return NULL;
×
2245
    }
2246

2247
    SReqResultInfo* pResInfo = &pRequest->body.resInfo;
×
2248
    SSchedulerReq   req = {.syncReq = true, .pFetchRes = (void**)&pResInfo->pData};
×
2249

2250
    pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
×
2251
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2252
      pResultInfo->numOfRows = 0;
×
2253
      return NULL;
×
2254
    }
2255

2256
    pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (const SRetrieveTableRsp*)pResInfo->pData,
×
2257
                                           convertUcs4, pRequest->stmtBindVersion > 0);
×
2258
    if (pRequest->code != TSDB_CODE_SUCCESS) {
×
2259
      pResultInfo->numOfRows = 0;
×
2260
      return NULL;
×
2261
    }
2262

2263
    tscDebug("req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64
×
2264
             ", complete:%d, QID:0x%" PRIx64,
2265
             pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
2266

2267
    STscObj*            pTscObj = pRequest->pTscObj;
×
2268
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
×
2269
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
×
2270

2271
    if (pResultInfo->numOfRows == 0) {
×
2272
      return NULL;
×
2273
    }
2274
  }
2275

2276
  if (setupOneRowPtr) {
×
2277
    doSetOneRowPtr(pResultInfo);
×
2278
    pResultInfo->current += 1;
×
2279
  }
2280

2281
  return pResultInfo->row;
×
2282
}
2283

2284
static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) {
252,258,760✔
2285
  tsem_t* sem = param;
252,258,760✔
2286
  if (TSDB_CODE_SUCCESS != tsem_post(sem)) {
252,258,760✔
2287
    tscError("failed to post sem, code:%s", terrstr());
×
2288
  }
2289
}
252,258,763✔
2290

2291
void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) {
1,819,706,044✔
2292
  if (pRequest == NULL) {
1,819,706,044✔
2293
    return NULL;
×
2294
  }
2295

2296
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
1,819,706,044✔
2297
  if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
1,819,706,160✔
2298
    // All data has returned to App already, no need to try again
2299
    if (pResultInfo->completed) {
356,583,881✔
2300
      pResultInfo->numOfRows = 0;
104,325,307✔
2301
      CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
208,650,543✔
2302
      return NULL;
104,325,307✔
2303
    }
2304

2305
    // convert ucs4 to native multi-bytes string
2306
    pResultInfo->convertUcs4 = convertUcs4;
252,257,516✔
2307
    tsem_t sem;
251,549,107✔
2308
    if (TSDB_CODE_SUCCESS != tsem_init(&sem, 0, 0)) {
252,258,693✔
2309
      tscError("failed to init sem, code:%s", terrstr());
×
2310
    }
2311
    taos_fetch_rows_a(pRequest, syncFetchFn, &sem);
252,258,700✔
2312
    if (TSDB_CODE_SUCCESS != tsem_wait(&sem)) {
252,258,763✔
2313
      tscError("failed to wait sem, code:%s", terrstr());
×
2314
    }
2315
    if (TSDB_CODE_SUCCESS != tsem_destroy(&sem)) {
252,258,763✔
2316
      tscError("failed to destroy sem, code:%s", terrstr());
×
2317
    }
2318
    pRequest->inCallback = false;
252,258,742✔
2319
  }
2320

2321
  if (pResultInfo->numOfRows == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
1,715,380,957✔
2322
    return NULL;
11,947,652✔
2323
  } else {
2324
    if (setupOneRowPtr) {
1,703,433,278✔
2325
      doSetOneRowPtr(pResultInfo);
1,461,710,900✔
2326
      pResultInfo->current += 1;
1,461,710,959✔
2327
    }
2328

2329
    return pResultInfo->row;
1,703,433,337✔
2330
  }
2331
}
2332

2333
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
370,289,168✔
2334
  if (pResInfo->row == NULL) {
370,289,168✔
2335
    pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
229,987,877✔
2336
    pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
229,987,058✔
2337
    pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
229,984,197✔
2338
    pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
229,985,616✔
2339

2340
    if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
229,986,584✔
2341
      taosMemoryFree(pResInfo->row);
6,342✔
2342
      taosMemoryFree(pResInfo->pCol);
×
2343
      taosMemoryFree(pResInfo->length);
×
2344
      taosMemoryFree(pResInfo->convertBuf);
×
2345
      return terrno;
×
2346
    }
2347
  }
2348

2349
  return TSDB_CODE_SUCCESS;
370,287,038✔
2350
}
2351

2352
static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t* colLength, bool isStmt) {
370,079,204✔
2353
  int32_t idx = -1;
370,079,204✔
2354
  iconv_t conv = taosAcquireConv(&idx, C2M, pResultInfo->charsetCxt);
370,079,612✔
2355
  if (conv == (iconv_t)-1) return TSDB_CODE_TSC_INTERNAL_ERROR;
370,073,806✔
2356

2357
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,100,311,225✔
2358
    int32_t type = pResultInfo->fields[i].type;
1,730,247,717✔
2359
    int32_t schemaBytes =
2360
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
1,730,246,204✔
2361

2362
    if (type == TSDB_DATA_TYPE_NCHAR && colLength[i] > 0) {
1,730,240,476✔
2363
      char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
103,450,974✔
2364
      if (p == NULL) {
103,450,974✔
2365
        taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
×
2366
        return terrno;
×
2367
      }
2368

2369
      pResultInfo->convertBuf[i] = p;
103,450,974✔
2370

2371
      SResultColumn* pCol = &pResultInfo->pCol[i];
103,450,974✔
2372
      for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
2,147,483,647✔
2373
        if (pCol->offset[j] != -1) {
2,147,483,647✔
2374
          char* pStart = pCol->offset[j] + pCol->pData;
2,147,483,647✔
2375

2376
          int32_t len = taosUcs4ToMbsEx((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p), conv);
2,147,483,647✔
2377
          if (len < 0 || len > schemaBytes || (p + len) >= (pResultInfo->convertBuf[i] + colLength[i])) {
2,147,483,647✔
2378
            tscError(
2,334✔
2379
                "doConvertUCS4 error, invalid data. len:%d, bytes:%d, (p + len):%p, (pResultInfo->convertBuf[i] + "
2380
                "colLength[i]):%p",
2381
                len, schemaBytes, (p + len), (pResultInfo->convertBuf[i] + colLength[i]));
2382
            taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
2,334✔
2383
            return TSDB_CODE_TSC_INTERNAL_ERROR;
66✔
2384
          }
2385

2386
          varDataSetLen(p, len);
2,147,483,647✔
2387
          pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
2,147,483,647✔
2388
          p += (len + VARSTR_HEADER_SIZE);
2,147,483,647✔
2389
        }
2390
      }
2391

2392
      pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
103,450,908✔
2393
      pResultInfo->row[i] = pResultInfo->pCol[i].pData;
103,450,908✔
2394
    }
2395
  }
2396
  taosReleaseConv(idx, conv, C2M, pResultInfo->charsetCxt);
370,076,733✔
2397
  return TSDB_CODE_SUCCESS;
370,080,317✔
2398
}
2399

2400
static int32_t convertDecimalType(SReqResultInfo* pResultInfo) {
370,075,672✔
2401
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,100,299,462✔
2402
    TAOS_FIELD_E* pFieldE = pResultInfo->fields + i;
1,730,235,418✔
2403
    TAOS_FIELD*   pField = pResultInfo->userFields + i;
1,730,236,345✔
2404
    int32_t       type = pFieldE->type;
1,730,234,801✔
2405
    int32_t       bufLen = 0;
1,730,238,434✔
2406
    char*         p = NULL;
1,730,238,434✔
2407
    if (!IS_DECIMAL_TYPE(type) || !pResultInfo->pCol[i].pData) {
1,730,238,434✔
2408
      continue;
1,728,421,107✔
2409
    } else {
2410
      bufLen = 64;
1,810,350✔
2411
      p = taosMemoryRealloc(pResultInfo->convertBuf[i], bufLen * pResultInfo->numOfRows);
1,810,350✔
2412
      pFieldE->bytes = bufLen;
1,810,350✔
2413
      pField->bytes = bufLen;
1,810,350✔
2414
    }
2415
    if (!p) return terrno;
1,810,350✔
2416
    pResultInfo->convertBuf[i] = p;
1,810,350✔
2417

2418
    for (int32_t j = 0; j < pResultInfo->numOfRows; ++j) {
1,085,429,556✔
2419
      int32_t code = decimalToStr((DecimalWord*)(pResultInfo->pCol[i].pData + j * tDataTypes[type].bytes), type,
1,083,619,206✔
2420
                                  pFieldE->precision, pFieldE->scale, p, bufLen);
1,083,619,206✔
2421
      p += bufLen;
1,083,619,206✔
2422
      if (TSDB_CODE_SUCCESS != code) {
1,083,619,206✔
2423
        return code;
×
2424
      }
2425
    }
2426
    pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
1,810,350✔
2427
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,810,350✔
2428
  }
2429
  return 0;
370,075,352✔
2430
}
2431

2432
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols) {
447,022✔
2433
  return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) * 3 + sizeof(uint64_t) +
893,648✔
2434
         numOfCols * (sizeof(int8_t) + sizeof(int32_t));
446,626✔
2435
}
2436

2437
static int32_t estimateJsonLen(SReqResultInfo* pResultInfo) {
223,511✔
2438
  char*   p = (char*)pResultInfo->pData;
223,511✔
2439
  int32_t blockVersion = *(int32_t*)p;
223,511✔
2440

2441
  int32_t numOfRows = pResultInfo->numOfRows;
223,511✔
2442
  int32_t numOfCols = pResultInfo->numOfCols;
223,511✔
2443

2444
  // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
2445
  // length |
2446
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
223,511✔
2447
  if (numOfCols != cols) {
223,511✔
2448
    tscError("estimateJsonLen error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2449
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2450
  }
2451

2452
  int32_t  len = getVersion1BlockMetaSize(p, numOfCols);
223,511✔
2453
  int32_t* colLength = (int32_t*)(p + len);
223,511✔
2454
  len += sizeof(int32_t) * numOfCols;
223,511✔
2455

2456
  char* pStart = p + len;
223,511✔
2457
  for (int32_t i = 0; i < numOfCols; ++i) {
952,464✔
2458
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
728,953✔
2459

2460
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
728,953✔
2461
      int32_t* offset = (int32_t*)pStart;
259,019✔
2462
      int32_t  lenTmp = numOfRows * sizeof(int32_t);
259,019✔
2463
      len += lenTmp;
259,019✔
2464
      pStart += lenTmp;
259,019✔
2465

2466
      int32_t estimateColLen = 0;
259,019✔
2467
      for (int32_t j = 0; j < numOfRows; ++j) {
1,242,676✔
2468
        if (offset[j] == -1) {
983,657✔
2469
          continue;
55,264✔
2470
        }
2471
        char* data = offset[j] + pStart;
928,393✔
2472

2473
        int32_t jsonInnerType = *data;
928,393✔
2474
        char*   jsonInnerData = data + CHAR_BYTES;
928,393✔
2475
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
928,393✔
2476
          estimateColLen += (VARSTR_HEADER_SIZE + strlen(TSDB_DATA_NULL_STR_L));
12,864✔
2477
        } else if (tTagIsJson(data)) {
915,529✔
2478
          estimateColLen += (VARSTR_HEADER_SIZE + ((const STag*)(data))->len);
227,660✔
2479
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
687,869✔
2480
          estimateColLen += varDataTLen(jsonInnerData) + CHAR_BYTES * 2;
639,629✔
2481
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,240✔
2482
          estimateColLen += (VARSTR_HEADER_SIZE + 32);
35,376✔
2483
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
12,864✔
2484
          estimateColLen += (VARSTR_HEADER_SIZE + 5);
12,864✔
2485
        } else if (IS_STR_DATA_BLOB(jsonInnerType)) {
×
2486
          estimateColLen += (BLOBSTR_HEADER_SIZE + 32);
×
2487
        } else {
2488
          tscError("estimateJsonLen error: invalid type:%d", jsonInnerType);
×
2489
          return -1;
×
2490
        }
2491
      }
2492
      len += TMAX(colLen, estimateColLen);
259,019✔
2493
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
469,934✔
2494
      int32_t lenTmp = numOfRows * sizeof(int32_t);
64,364✔
2495
      len += (lenTmp + colLen);
64,364✔
2496
      pStart += lenTmp;
64,364✔
2497
    } else {
2498
      int32_t lenTmp = BitmapLen(pResultInfo->numOfRows);
405,570✔
2499
      len += (lenTmp + colLen);
405,570✔
2500
      pStart += lenTmp;
405,570✔
2501
    }
2502
    pStart += colLen;
728,953✔
2503
  }
2504

2505
  // Ensure the complete structure of the block, including the blankfill field,
2506
  // even though it is not used on the client side.
2507
  len += sizeof(bool);
223,511✔
2508
  return len;
223,511✔
2509
}
2510

2511
static int32_t doConvertJson(SReqResultInfo* pResultInfo) {
370,285,198✔
2512
  int32_t numOfRows = pResultInfo->numOfRows;
370,285,198✔
2513
  int32_t numOfCols = pResultInfo->numOfCols;
370,286,810✔
2514
  bool    needConvert = false;
370,285,718✔
2515
  for (int32_t i = 0; i < numOfCols; ++i) {
2,101,131,012✔
2516
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
1,731,066,165✔
2517
      needConvert = true;
223,511✔
2518
      break;
223,511✔
2519
    }
2520
  }
2521

2522
  if (!needConvert) {
370,288,358✔
2523
    return TSDB_CODE_SUCCESS;
370,064,847✔
2524
  }
2525

2526
  tscDebug("start to convert form json format string");
223,511✔
2527

2528
  char*   p = (char*)pResultInfo->pData;
223,511✔
2529
  int32_t blockVersion = *(int32_t*)p;
223,511✔
2530
  int32_t dataLen = estimateJsonLen(pResultInfo);
223,511✔
2531
  if (dataLen <= 0) {
223,511✔
2532
    tscError("doConvertJson error: estimateJsonLen failed");
×
2533
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2534
  }
2535

2536
  taosMemoryFreeClear(pResultInfo->convertJson);
223,511✔
2537
  pResultInfo->convertJson = taosMemoryCalloc(1, dataLen);
223,511✔
2538
  if (pResultInfo->convertJson == NULL) return terrno;
223,511✔
2539
  char* p1 = pResultInfo->convertJson;
223,511✔
2540

2541
  int32_t totalLen = 0;
223,511✔
2542
  int32_t cols = *(int32_t*)(p + sizeof(int32_t) * 3);
223,511✔
2543
  if (numOfCols != cols) {
223,511✔
2544
    tscError("doConvertJson error: numOfCols:%d != cols:%d", numOfCols, cols);
×
2545
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2546
  }
2547

2548
  int32_t len = getVersion1BlockMetaSize(p, numOfCols);
223,511✔
2549
  (void)memcpy(p1, p, len);
223,511✔
2550

2551
  p += len;
223,511✔
2552
  p1 += len;
223,511✔
2553
  totalLen += len;
223,511✔
2554

2555
  len = sizeof(int32_t) * numOfCols;
223,511✔
2556
  int32_t* colLength = (int32_t*)p;
223,511✔
2557
  int32_t* colLength1 = (int32_t*)p1;
223,511✔
2558
  (void)memcpy(p1, p, len);
223,511✔
2559
  p += len;
223,511✔
2560
  p1 += len;
223,511✔
2561
  totalLen += len;
223,511✔
2562

2563
  char* pStart = p;
223,511✔
2564
  char* pStart1 = p1;
223,511✔
2565
  for (int32_t i = 0; i < numOfCols; ++i) {
952,464✔
2566
    int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
728,953✔
2567
    int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
728,953✔
2568
    if (colLen >= dataLen) {
728,953✔
2569
      tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
×
2570
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2571
    }
2572
    if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
728,953✔
2573
      int32_t* offset = (int32_t*)pStart;
259,019✔
2574
      int32_t* offset1 = (int32_t*)pStart1;
259,019✔
2575
      len = numOfRows * sizeof(int32_t);
259,019✔
2576
      (void)memcpy(pStart1, pStart, len);
259,019✔
2577
      pStart += len;
259,019✔
2578
      pStart1 += len;
259,019✔
2579
      totalLen += len;
259,019✔
2580

2581
      len = 0;
259,019✔
2582
      for (int32_t j = 0; j < numOfRows; ++j) {
1,242,676✔
2583
        if (offset[j] == -1) {
983,657✔
2584
          continue;
55,264✔
2585
        }
2586
        char* data = offset[j] + pStart;
928,393✔
2587

2588
        int32_t jsonInnerType = *data;
928,393✔
2589
        char*   jsonInnerData = data + CHAR_BYTES;
928,393✔
2590
        char    dst[TSDB_MAX_JSON_TAG_LEN] = {0};
928,393✔
2591
        if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
928,393✔
2592
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s", TSDB_DATA_NULL_STR_L);
12,864✔
2593
          varDataSetLen(dst, strlen(varDataVal(dst)));
12,864✔
2594
        } else if (tTagIsJson(data)) {
915,529✔
2595
          char* jsonString = NULL;
227,660✔
2596
          parseTagDatatoJson(data, &jsonString, pResultInfo->charsetCxt);
227,660✔
2597
          if (jsonString == NULL) {
227,660✔
2598
            tscError("doConvertJson error: parseTagDatatoJson failed");
×
2599
            return terrno;
×
2600
          }
2601
          STR_TO_VARSTR(dst, jsonString);
227,660✔
2602
          taosMemoryFree(jsonString);
227,660✔
2603
        } else if (jsonInnerType == TSDB_DATA_TYPE_NCHAR) {  // value -> "value"
687,869✔
2604
          *(char*)varDataVal(dst) = '\"';
639,629✔
2605
          char    tmp[TSDB_MAX_JSON_TAG_LEN] = {0};
639,629✔
2606
          int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(jsonInnerData), varDataLen(jsonInnerData), varDataVal(tmp),
639,629✔
2607
                                         pResultInfo->charsetCxt);
2608
          if (length <= 0) {
639,629✔
2609
            tscError("charset:%s to %s. convert failed.", DEFAULT_UNICODE_ENCODEC,
536✔
2610
                     pResultInfo->charsetCxt != NULL ? ((SConvInfo*)(pResultInfo->charsetCxt))->charset : tsCharset);
2611
            length = 0;
536✔
2612
          }
2613
          int32_t escapeLength = escapeToPrinted(varDataVal(dst) + CHAR_BYTES, TSDB_MAX_JSON_TAG_LEN - CHAR_BYTES * 2,
639,629✔
2614
                                                 varDataVal(tmp), length);
2615
          varDataSetLen(dst, escapeLength + CHAR_BYTES * 2);
639,629✔
2616
          *(char*)POINTER_SHIFT(varDataVal(dst), escapeLength + CHAR_BYTES) = '\"';
639,629✔
2617
          tscError("value:%s.", varDataVal(dst));
639,629✔
2618
        } else if (jsonInnerType == TSDB_DATA_TYPE_DOUBLE) {
48,240✔
2619
          double jsonVd = *(double*)(jsonInnerData);
35,376✔
2620
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%.9lf", jsonVd);
35,376✔
2621
          varDataSetLen(dst, strlen(varDataVal(dst)));
35,376✔
2622
        } else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
12,864✔
2623
          (void)snprintf(varDataVal(dst), TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE, "%s",
12,864✔
2624
                         (*((char*)jsonInnerData) == 1) ? "true" : "false");
12,864✔
2625
          varDataSetLen(dst, strlen(varDataVal(dst)));
12,864✔
2626
        } else {
2627
          tscError("doConvertJson error: invalid type:%d", jsonInnerType);
×
2628
          return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2629
        }
2630

2631
        offset1[j] = len;
928,393✔
2632
        (void)memcpy(pStart1 + len, dst, varDataTLen(dst));
928,393✔
2633
        len += varDataTLen(dst);
928,393✔
2634
      }
2635
      colLen1 = len;
259,019✔
2636
      totalLen += colLen1;
259,019✔
2637
      colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
259,019✔
2638
    } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
469,934✔
2639
      len = numOfRows * sizeof(int32_t);
64,364✔
2640
      (void)memcpy(pStart1, pStart, len);
64,364✔
2641
      pStart += len;
64,364✔
2642
      pStart1 += len;
64,364✔
2643
      totalLen += len;
64,364✔
2644
      totalLen += colLen;
64,364✔
2645
      (void)memcpy(pStart1, pStart, colLen);
64,364✔
2646
    } else {
2647
      len = BitmapLen(pResultInfo->numOfRows);
405,570✔
2648
      (void)memcpy(pStart1, pStart, len);
405,570✔
2649
      pStart += len;
405,570✔
2650
      pStart1 += len;
405,570✔
2651
      totalLen += len;
405,570✔
2652
      totalLen += colLen;
405,570✔
2653
      (void)memcpy(pStart1, pStart, colLen);
405,570✔
2654
    }
2655
    pStart += colLen;
728,953✔
2656
    pStart1 += colLen1;
728,953✔
2657
  }
2658

2659
  // Ensure the complete structure of the block, including the blankfill field,
2660
  // even though it is not used on the client side.
2661
  // (void)memcpy(pStart1, pStart, sizeof(bool));
2662
  totalLen += sizeof(bool);
223,511✔
2663

2664
  *(int32_t*)(pResultInfo->convertJson + 4) = totalLen;
223,511✔
2665
  pResultInfo->pData = pResultInfo->convertJson;
223,511✔
2666
  return TSDB_CODE_SUCCESS;
223,511✔
2667
}
2668

2669
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, bool convertUcs4, bool isStmt) {
396,713,165✔
2670
  bool convertForDecimal = convertUcs4;
396,713,165✔
2671
  if (pResultInfo == NULL || pResultInfo->numOfCols <= 0 || pResultInfo->fields == NULL) {
396,713,165✔
2672
    tscError("setResultDataPtr paras error");
×
2673
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2674
  }
2675

2676
  if (pResultInfo->numOfRows == 0) {
396,714,299✔
2677
    return TSDB_CODE_SUCCESS;
26,422,699✔
2678
  }
2679

2680
  if (pResultInfo->pData == NULL) {
370,290,746✔
2681
    tscError("setResultDataPtr error: pData is NULL");
×
2682
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2683
  }
2684

2685
  int32_t code = doPrepareResPtr(pResultInfo);
370,288,235✔
2686
  if (code != TSDB_CODE_SUCCESS) {
370,288,704✔
2687
    return code;
×
2688
  }
2689
  code = doConvertJson(pResultInfo);
370,288,704✔
2690
  if (code != TSDB_CODE_SUCCESS) {
370,282,217✔
2691
    return code;
×
2692
  }
2693

2694
  char* p = (char*)pResultInfo->pData;
370,282,217✔
2695

2696
  // version:
2697
  int32_t blockVersion = *(int32_t*)p;
370,283,849✔
2698
  p += sizeof(int32_t);
370,285,952✔
2699

2700
  int32_t dataLen = *(int32_t*)p;
370,286,768✔
2701
  p += sizeof(int32_t);
370,286,360✔
2702

2703
  int32_t rows = *(int32_t*)p;
370,284,707✔
2704
  p += sizeof(int32_t);
370,284,707✔
2705

2706
  int32_t cols = *(int32_t*)p;
370,284,341✔
2707
  p += sizeof(int32_t);
370,286,831✔
2708

2709
  if (rows != pResultInfo->numOfRows || cols != pResultInfo->numOfCols) {
370,285,994✔
2710
    tscError("setResultDataPtr paras error:rows;%d numOfRows:%" PRId64 " cols:%d numOfCols:%d", rows,
5,193✔
2711
             pResultInfo->numOfRows, cols, pResultInfo->numOfCols);
2712
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2713
  }
2714

2715
  int32_t hasColumnSeg = *(int32_t*)p;
370,281,596✔
2716
  p += sizeof(int32_t);
370,286,231✔
2717

2718
  uint64_t groupId = taosGetUInt64Aligned((uint64_t*)p);
370,286,052✔
2719
  p += sizeof(uint64_t);
370,286,052✔
2720

2721
  // check fields
2722
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,101,404,285✔
2723
    int8_t type = *(int8_t*)p;
1,731,128,811✔
2724
    p += sizeof(int8_t);
1,731,123,587✔
2725

2726
    int32_t bytes = *(int32_t*)p;
1,731,127,005✔
2727
    p += sizeof(int32_t);
1,731,124,498✔
2728

2729
    if (IS_DECIMAL_TYPE(type) && pResultInfo->fields[i].precision == 0) {
1,731,122,672✔
2730
      extractDecimalTypeInfoFromBytes(&bytes, &pResultInfo->fields[i].precision, &pResultInfo->fields[i].scale);
41,336✔
2731
    }
2732
  }
2733

2734
  int32_t* colLength = (int32_t*)p;
370,289,927✔
2735
  p += sizeof(int32_t) * pResultInfo->numOfCols;
370,289,927✔
2736

2737
  char* pStart = p;
370,288,959✔
2738
  for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
2,101,439,551✔
2739
    if ((pStart - pResultInfo->pData) >= dataLen) {
1,731,148,712✔
2740
      tscError("setResultDataPtr invalid offset over dataLen %d", dataLen);
×
2741
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2742
    }
2743
    if (blockVersion == BLOCK_VERSION_1) {
1,731,118,757✔
2744
      colLength[i] = htonl(colLength[i]);
1,237,095,841✔
2745
    }
2746
    if (colLength[i] >= dataLen) {
1,731,118,235✔
2747
      tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen);
×
2748
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2749
    }
2750
    if (IS_INVALID_TYPE(pResultInfo->fields[i].type)) {
1,731,140,020✔
2751
      tscError("invalid type %d", pResultInfo->fields[i].type);
987✔
2752
      return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2753
    }
2754
    if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
1,731,142,059✔
2755
      pResultInfo->pCol[i].offset = (int32_t*)pStart;
369,517,784✔
2756
      pStart += pResultInfo->numOfRows * sizeof(int32_t);
369,503,893✔
2757
    } else {
2758
      pResultInfo->pCol[i].nullbitmap = pStart;
1,361,644,703✔
2759
      pStart += BitmapLen(pResultInfo->numOfRows);
1,361,650,873✔
2760
    }
2761

2762
    pResultInfo->pCol[i].pData = pStart;
1,731,152,705✔
2763
    pResultInfo->length[i] =
2,147,483,647✔
2764
        calcSchemaBytesFromTypeBytes(pResultInfo->fields[i].type, pResultInfo->fields[i].bytes, isStmt);
2,147,483,647✔
2765
    pResultInfo->row[i] = pResultInfo->pCol[i].pData;
1,731,147,392✔
2766

2767
    pStart += colLength[i];
1,731,147,328✔
2768
  }
2769

2770
  p = pStart;
370,290,892✔
2771
  // bool blankFill = *(bool*)p;
2772
  p += sizeof(bool);
370,290,892✔
2773
  int32_t offset = p - pResultInfo->pData;
370,290,076✔
2774
  if (offset > dataLen) {
370,289,938✔
2775
    tscError("invalid offset %d, dataLen %d", offset, dataLen);
×
2776
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2777
  }
2778

2779
#ifndef DISALLOW_NCHAR_WITHOUT_ICONV
2780
  if (convertUcs4) {
370,289,938✔
2781
    code = doConvertUCS4(pResultInfo, colLength, isStmt);
370,079,797✔
2782
  }
2783
#endif
2784
  if (TSDB_CODE_SUCCESS == code && convertForDecimal) {
370,290,932✔
2785
    code = convertDecimalType(pResultInfo);
370,080,725✔
2786
  }
2787
  return code;
370,285,538✔
2788
}
2789

2790
char* getDbOfConnection(STscObj* pObj) {
1,415,037,061✔
2791
  terrno = TSDB_CODE_SUCCESS;
1,415,037,061✔
2792
  char* p = NULL;
1,415,088,513✔
2793
  (void)taosThreadMutexLock(&pObj->mutex);
1,415,088,513✔
2794
  size_t len = strlen(pObj->db);
1,415,114,290✔
2795
  if (len > 0) {
1,415,115,530✔
2796
    p = taosStrndup(pObj->db, tListLen(pObj->db));
1,015,504,387✔
2797
    if (p == NULL) {
1,015,525,782✔
2798
      tscError("failed to taosStrndup db name");
×
2799
    }
2800
  }
2801

2802
  (void)taosThreadMutexUnlock(&pObj->mutex);
1,415,136,925✔
2803
  return p;
1,415,032,455✔
2804
}
2805

2806
void setConnectionDB(STscObj* pTscObj, const char* db) {
94,411,506✔
2807
  if (db == NULL || pTscObj == NULL) {
94,411,506✔
2808
    tscError("setConnectionDB para is NULL");
×
2809
    return;
×
2810
  }
2811

2812
  (void)taosThreadMutexLock(&pTscObj->mutex);
94,452,487✔
2813
  tstrncpy(pTscObj->db, db, tListLen(pTscObj->db));
94,471,708✔
2814
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
94,471,689✔
2815
}
2816

2817
void resetConnectDB(STscObj* pTscObj) {
×
2818
  if (pTscObj == NULL) {
×
2819
    return;
×
2820
  }
2821

2822
  (void)taosThreadMutexLock(&pTscObj->mutex);
×
2823
  pTscObj->db[0] = 0;
×
2824
  (void)taosThreadMutexUnlock(&pTscObj->mutex);
×
2825
}
2826

2827
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4,
307,927,232✔
2828
                              bool isStmt) {
2829
  if (pResultInfo == NULL || pRsp == NULL) {
307,927,232✔
2830
    tscError("setQueryResultFromRsp paras is null");
39✔
2831
    return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2832
  }
2833

2834
  taosMemoryFreeClear(pResultInfo->pRspMsg);
307,927,206✔
2835
  pResultInfo->pRspMsg = (const char*)pRsp;
307,927,228✔
2836
  pResultInfo->numOfRows = htobe64(pRsp->numOfRows);
307,927,248✔
2837
  pResultInfo->current = 0;
307,927,195✔
2838
  pResultInfo->completed = (pRsp->completed == 1);
307,927,111✔
2839
  pResultInfo->precision = pRsp->precision;
307,927,067✔
2840

2841
  // decompress data if needed
2842
  int32_t payloadLen = htonl(pRsp->payloadLen);
307,927,153✔
2843

2844
  if (pRsp->compressed) {
307,927,111✔
2845
    if (pResultInfo->decompBuf == NULL) {
×
2846
      pResultInfo->decompBuf = taosMemoryMalloc(payloadLen);
×
2847
      if (pResultInfo->decompBuf == NULL) {
×
2848
        tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2849
        return terrno;
×
2850
      }
2851
      pResultInfo->decompBufSize = payloadLen;
×
2852
    } else {
2853
      if (pResultInfo->decompBufSize < payloadLen) {
×
2854
        char* p = taosMemoryRealloc(pResultInfo->decompBuf, payloadLen);
×
2855
        if (p == NULL) {
×
2856
          tscError("failed to prepare the decompress buffer, size:%d", payloadLen);
×
2857
          return terrno;
×
2858
        }
2859

2860
        pResultInfo->decompBuf = p;
×
2861
        pResultInfo->decompBufSize = payloadLen;
×
2862
      }
2863
    }
2864
  }
2865

2866
  if (payloadLen > 0) {
307,927,110✔
2867
    int32_t compLen = *(int32_t*)pRsp->data;
281,505,529✔
2868
    int32_t rawLen = *(int32_t*)(pRsp->data + sizeof(int32_t));
281,505,466✔
2869

2870
    char* pStart = (char*)pRsp->data + sizeof(int32_t) * 2;
281,505,544✔
2871

2872
    if (pRsp->compressed && compLen < rawLen) {
281,505,505✔
2873
      int32_t len = tsDecompressString(pStart, compLen, 1, pResultInfo->decompBuf, rawLen, ONE_STAGE_COMP, NULL, 0);
×
2874
      if (len < 0) {
×
2875
        tscError("tsDecompressString failed");
×
2876
        return terrno ? terrno : TSDB_CODE_FAILED;
×
2877
      }
2878
      if (len != rawLen) {
×
2879
        tscError("tsDecompressString failed, len:%d != rawLen:%d", len, rawLen);
×
2880
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2881
      }
2882
      pResultInfo->pData = pResultInfo->decompBuf;
×
2883
      pResultInfo->payloadLen = rawLen;
×
2884
    } else {
2885
      pResultInfo->pData = pStart;
281,505,463✔
2886
      pResultInfo->payloadLen = htonl(pRsp->compLen);
281,505,550✔
2887
      if (pRsp->compLen != pRsp->payloadLen) {
281,505,550✔
2888
        tscError("pRsp->compLen:%d != pRsp->payloadLen:%d", pRsp->compLen, pRsp->payloadLen);
×
2889
        return TSDB_CODE_TSC_INTERNAL_ERROR;
×
2890
      }
2891
    }
2892
  }
2893

2894
  // TODO handle the compressed case
2895
  pResultInfo->totalRows += pResultInfo->numOfRows;
307,927,048✔
2896

2897
  int32_t code = setResultDataPtr(pResultInfo, convertUcs4, isStmt);
307,927,178✔
2898
  return code;
307,925,955✔
2899
}
2900

2901
TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* details, int maxlen) {
288✔
2902
  TSDB_SERVER_STATUS code = TSDB_SRV_STATUS_UNAVAILABLE;
288✔
2903
  void*              clientRpc = NULL;
288✔
2904
  SServerStatusRsp   statusRsp = {0};
288✔
2905
  SEpSet             epSet = {.inUse = 0, .numOfEps = 1};
288✔
2906
  SRpcMsg  rpcMsg = {.info.ahandle = (void*)0x9527, .info.notFreeAhandle = 1, .msgType = TDMT_DND_SERVER_STATUS};
288✔
2907
  SRpcMsg  rpcRsp = {0};
288✔
2908
  SRpcInit rpcInit = {0};
288✔
2909
  char     pass[TSDB_PASSWORD_LEN + 1] = {0};
288✔
2910

2911
  rpcInit.label = "CHK";
288✔
2912
  rpcInit.numOfThreads = 1;
288✔
2913
  rpcInit.cfp = NULL;
288✔
2914
  rpcInit.sessions = 16;
288✔
2915
  rpcInit.connType = TAOS_CONN_CLIENT;
288✔
2916
  rpcInit.idleTime = tsShellActivityTimer * 1000;
288✔
2917
  rpcInit.compressSize = tsCompressMsgSize;
288✔
2918
  rpcInit.user = "_dnd";
288✔
2919

2920
  int32_t connLimitNum = tsNumOfRpcSessions / (tsNumOfRpcThreads * 3);
288✔
2921
  connLimitNum = TMAX(connLimitNum, 10);
288✔
2922
  connLimitNum = TMIN(connLimitNum, 500);
288✔
2923
  rpcInit.connLimitNum = connLimitNum;
288✔
2924
  rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
288✔
2925
  rpcInit.readTimeout = tsReadTimeout;
288✔
2926
  rpcInit.ipv6 = tsEnableIpv6;
288✔
2927
  rpcInit.enableSSL = tsEnableTLS;
288✔
2928

2929
  memcpy(rpcInit.caPath, tsTLSCaPath, strlen(tsTLSCaPath));
288✔
2930
  memcpy(rpcInit.certPath, tsTLSSvrCertPath, strlen(tsTLSSvrCertPath));
288✔
2931
  memcpy(rpcInit.keyPath, tsTLSSvrKeyPath, strlen(tsTLSSvrKeyPath));
288✔
2932
  memcpy(rpcInit.cliCertPath, tsTLSCliCertPath, strlen(tsTLSCliCertPath));
288✔
2933
  memcpy(rpcInit.cliKeyPath, tsTLSCliKeyPath, strlen(tsTLSCliKeyPath));
288✔
2934

2935
  if (TSDB_CODE_SUCCESS != taosVersionStrToInt(td_version, &rpcInit.compatibilityVer)) {
288✔
2936
    tscError("faild to convert taos version from str to int, errcode:%s", terrstr());
×
2937
    goto _OVER;
×
2938
  }
2939

2940
  clientRpc = rpcOpen(&rpcInit);
288✔
2941
  if (clientRpc == NULL) {
288✔
2942
    code = terrno;
×
2943
    tscError("failed to init server status client since %s", tstrerror(code));
×
2944
    goto _OVER;
×
2945
  }
2946

2947
  if (fqdn == NULL) {
288✔
2948
    fqdn = tsLocalFqdn;
288✔
2949
  }
2950

2951
  if (port == 0) {
288✔
2952
    port = tsServerPort;
288✔
2953
  }
2954

2955
  tstrncpy(epSet.eps[0].fqdn, fqdn, TSDB_FQDN_LEN);
288✔
2956
  epSet.eps[0].port = (uint16_t)port;
288✔
2957
  int32_t ret = rpcSendRecv(clientRpc, &epSet, &rpcMsg, &rpcRsp);
288✔
2958
  if (TSDB_CODE_SUCCESS != ret) {
288✔
2959
    tscError("failed to send recv since %s", tstrerror(ret));
×
2960
    goto _OVER;
×
2961
  }
2962

2963
  if (rpcRsp.code != 0 || rpcRsp.contLen <= 0 || rpcRsp.pCont == NULL) {
288✔
2964
    tscError("failed to send server status req since %s", terrstr());
70✔
2965
    goto _OVER;
70✔
2966
  }
2967

2968
  if (tDeserializeSServerStatusRsp(rpcRsp.pCont, rpcRsp.contLen, &statusRsp) != 0) {
218✔
2969
    tscError("failed to parse server status rsp since %s", terrstr());
×
2970
    goto _OVER;
×
2971
  }
2972

2973
  code = statusRsp.statusCode;
218✔
2974
  if (details != NULL) {
218✔
2975
    tstrncpy(details, statusRsp.details, maxlen);
218✔
2976
  }
2977

2978
_OVER:
274✔
2979
  if (clientRpc != NULL) {
288✔
2980
    rpcClose(clientRpc);
288✔
2981
  }
2982
  if (rpcRsp.pCont != NULL) {
288✔
2983
    rpcFreeCont(rpcRsp.pCont);
218✔
2984
  }
2985
  return code;
288✔
2986
}
2987

2988
int32_t appendTbToReq(SHashObj* pHash, int32_t pos1, int32_t len1, int32_t pos2, int32_t len2, const char* str,
1,322✔
2989
                      int32_t acctId, char* db) {
2990
  SName name = {0};
1,322✔
2991

2992
  if (len1 <= 0) {
1,322✔
2993
    return -1;
×
2994
  }
2995

2996
  const char* dbName = db;
1,322✔
2997
  const char* tbName = NULL;
1,322✔
2998
  int32_t     dbLen = 0;
1,322✔
2999
  int32_t     tbLen = 0;
1,322✔
3000
  if (len2 > 0) {
1,322✔
3001
    dbName = str + pos1;
×
3002
    dbLen = len1;
×
3003
    tbName = str + pos2;
×
3004
    tbLen = len2;
×
3005
  } else {
3006
    dbLen = strlen(db);
1,322✔
3007
    tbName = str + pos1;
1,322✔
3008
    tbLen = len1;
1,322✔
3009
  }
3010

3011
  if (dbLen <= 0 || tbLen <= 0) {
1,322✔
3012
    return -1;
×
3013
  }
3014

3015
  if (tNameSetDbName(&name, acctId, dbName, dbLen)) {
1,322✔
3016
    return -1;
×
3017
  }
3018

3019
  if (tNameAddTbName(&name, tbName, tbLen)) {
1,322✔
3020
    return -1;
×
3021
  }
3022

3023
  char dbFName[TSDB_DB_FNAME_LEN] = {0};
1,322✔
3024
  (void)snprintf(dbFName, TSDB_DB_FNAME_LEN, "%d.%.*s", acctId, dbLen, dbName);
1,322✔
3025

3026
  STablesReq* pDb = taosHashGet(pHash, dbFName, strlen(dbFName));
1,322✔
3027
  if (pDb) {
1,322✔
3028
    if (NULL == taosArrayPush(pDb->pTables, &name)) {
×
3029
      return terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
×
3030
    }
3031
  } else {
3032
    STablesReq db;
1,322✔
3033
    db.pTables = taosArrayInit(20, sizeof(SName));
1,322✔
3034
    if (NULL == db.pTables) {
1,322✔
3035
      return terrno;
×
3036
    }
3037
    tstrncpy(db.dbFName, dbFName, TSDB_DB_FNAME_LEN);
1,322✔
3038
    if (NULL == taosArrayPush(db.pTables, &name)) {
2,644✔
3039
      return terrno;
×
3040
    }
3041
    TSC_ERR_RET(taosHashPut(pHash, dbFName, strlen(dbFName), &db, sizeof(db)));
1,322✔
3042
  }
3043

3044
  return TSDB_CODE_SUCCESS;
1,322✔
3045
}
3046

3047
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq) {
1,322✔
3048
  SHashObj* pHash = taosHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
1,322✔
3049
  if (NULL == pHash) {
1,322✔
3050
    return terrno;
×
3051
  }
3052

3053
  bool    inEscape = false;
1,322✔
3054
  int32_t code = 0;
1,322✔
3055
  void*   pIter = NULL;
1,322✔
3056

3057
  int32_t vIdx = 0;
1,322✔
3058
  int32_t vPos[2];
1,322✔
3059
  int32_t vLen[2];
1,322✔
3060

3061
  (void)memset(vPos, -1, sizeof(vPos));
1,322✔
3062
  (void)memset(vLen, 0, sizeof(vLen));
1,322✔
3063

3064
  for (int32_t i = 0;; ++i) {
6,610✔
3065
    if (0 == *(tbList + i)) {
6,610✔
3066
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
1,322✔
3067
        vLen[vIdx] = i - vPos[vIdx];
1,322✔
3068
      }
3069

3070
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
1,322✔
3071
      if (code) {
1,322✔
3072
        goto _return;
×
3073
      }
3074

3075
      break;
1,322✔
3076
    }
3077

3078
    if ('`' == *(tbList + i)) {
5,288✔
3079
      inEscape = !inEscape;
×
3080
      if (!inEscape) {
×
3081
        if (vPos[vIdx] >= 0) {
×
3082
          vLen[vIdx] = i - vPos[vIdx];
×
3083
        } else {
3084
          goto _return;
×
3085
        }
3086
      }
3087

3088
      continue;
×
3089
    }
3090

3091
    if (inEscape) {
5,288✔
3092
      if (vPos[vIdx] < 0) {
×
3093
        vPos[vIdx] = i;
×
3094
      }
3095
      continue;
×
3096
    }
3097

3098
    if ('.' == *(tbList + i)) {
5,288✔
3099
      if (vPos[vIdx] < 0) {
×
3100
        goto _return;
×
3101
      }
3102
      if (vLen[vIdx] <= 0) {
×
3103
        vLen[vIdx] = i - vPos[vIdx];
×
3104
      }
3105
      vIdx++;
×
3106
      if (vIdx >= 2) {
×
3107
        goto _return;
×
3108
      }
3109
      continue;
×
3110
    }
3111

3112
    if (',' == *(tbList + i)) {
5,288✔
3113
      if (vPos[vIdx] < 0) {
×
3114
        goto _return;
×
3115
      }
3116
      if (vLen[vIdx] <= 0) {
×
3117
        vLen[vIdx] = i - vPos[vIdx];
×
3118
      }
3119

3120
      code = appendTbToReq(pHash, vPos[0], vLen[0], vPos[1], vLen[1], tbList, acctId, dbName);
×
3121
      if (code) {
×
3122
        goto _return;
×
3123
      }
3124

3125
      (void)memset(vPos, -1, sizeof(vPos));
×
3126
      (void)memset(vLen, 0, sizeof(vLen));
×
3127
      vIdx = 0;
×
3128
      continue;
×
3129
    }
3130

3131
    if (' ' == *(tbList + i) || '\r' == *(tbList + i) || '\t' == *(tbList + i) || '\n' == *(tbList + i)) {
5,288✔
3132
      if (vPos[vIdx] >= 0 && vLen[vIdx] <= 0) {
×
3133
        vLen[vIdx] = i - vPos[vIdx];
×
3134
      }
3135
      continue;
×
3136
    }
3137

3138
    if (('a' <= *(tbList + i) && 'z' >= *(tbList + i)) || ('A' <= *(tbList + i) && 'Z' >= *(tbList + i)) ||
5,288✔
3139
        ('0' <= *(tbList + i) && '9' >= *(tbList + i)) || ('_' == *(tbList + i))) {
661✔
3140
      if (vLen[vIdx] > 0) {
5,288✔
3141
        goto _return;
×
3142
      }
3143
      if (vPos[vIdx] < 0) {
5,288✔
3144
        vPos[vIdx] = i;
1,322✔
3145
      }
3146
      continue;
5,288✔
3147
    }
3148

3149
    goto _return;
×
3150
  }
3151

3152
  int32_t dbNum = taosHashGetSize(pHash);
1,322✔
3153
  *pReq = taosArrayInit(dbNum, sizeof(STablesReq));
1,322✔
3154
  if (NULL == pReq) {
1,322✔
3155
    TSC_ERR_JRET(terrno);
×
3156
  }
3157
  pIter = taosHashIterate(pHash, NULL);
1,322✔
3158
  while (pIter) {
2,644✔
3159
    STablesReq* pDb = (STablesReq*)pIter;
1,322✔
3160
    if (NULL == taosArrayPush(*pReq, pDb)) {
2,644✔
3161
      TSC_ERR_JRET(terrno);
×
3162
    }
3163
    pIter = taosHashIterate(pHash, pIter);
1,322✔
3164
  }
3165

3166
  taosHashCleanup(pHash);
1,322✔
3167

3168
  return TSDB_CODE_SUCCESS;
1,322✔
3169

3170
_return:
×
3171

3172
  terrno = TSDB_CODE_TSC_INVALID_OPERATION;
×
3173

3174
  pIter = taosHashIterate(pHash, NULL);
×
3175
  while (pIter) {
×
3176
    STablesReq* pDb = (STablesReq*)pIter;
×
3177
    taosArrayDestroy(pDb->pTables);
×
3178
    pIter = taosHashIterate(pHash, pIter);
×
3179
  }
3180

3181
  taosHashCleanup(pHash);
×
3182

3183
  return terrno;
×
3184
}
3185

3186
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code) {
1,322✔
3187
  SSyncQueryParam* pParam = param;
1,322✔
3188
  pParam->pRequest->code = code;
1,322✔
3189

3190
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,322✔
3191
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3192
  }
3193
}
1,322✔
3194

3195
void syncQueryFn(void* param, void* res, int32_t code) {
1,216,477,285✔
3196
  SSyncQueryParam* pParam = param;
1,216,477,285✔
3197
  pParam->pRequest = res;
1,216,477,285✔
3198

3199
  if (pParam->pRequest) {
1,216,499,725✔
3200
    pParam->pRequest->code = code;
1,216,499,345✔
3201
    clientOperateReport(pParam->pRequest);
1,216,507,206✔
3202
  }
3203

3204
  if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
1,216,413,552✔
3205
    tscError("failed to post semaphore since %s", tstrerror(terrno));
×
3206
  }
3207
}
1,216,543,191✔
3208

3209
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
1,216,054,897✔
3210
                        int8_t source) {
3211
  if (sql == NULL || NULL == fp) {
1,216,054,897✔
3212
    terrno = TSDB_CODE_INVALID_PARA;
875✔
3213
    if (fp) {
×
3214
      fp(param, NULL, terrno);
×
3215
    }
3216

3217
    return;
×
3218
  }
3219

3220
  size_t sqlLen = strlen(sql);
1,216,061,072✔
3221
  if (sqlLen > (size_t)tsMaxSQLLength) {
1,216,061,072✔
3222
    tscError("conn:0x%" PRIx64 ", sql string exceeds max length:%d", connId, tsMaxSQLLength);
815✔
3223
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
815✔
3224
    fp(param, NULL, terrno);
815✔
3225
    return;
815✔
3226
  }
3227

3228
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, sql:%s", connId, sql);
1,216,060,257✔
3229

3230
  SRequestObj* pRequest = NULL;
1,216,060,257✔
3231
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, 0);
1,216,088,891✔
3232
  if (code != TSDB_CODE_SUCCESS) {
1,216,031,834✔
3233
    terrno = code;
3,055✔
3234
    fp(param, NULL, terrno);
3,055✔
3235
    return;
3,055✔
3236
  }
3237

3238
  code = connCheckAndUpateMetric(connId);
1,216,028,779✔
3239
  if (code != TSDB_CODE_SUCCESS) {
1,216,029,397✔
3240
    terrno = code;
378✔
3241
    fp(param, NULL, terrno);
378✔
3242
    return;
378✔
3243
  }
3244

3245
  pRequest->source = source;
1,216,029,019✔
3246
  pRequest->body.queryFp = fp;
1,216,025,279✔
3247
  doAsyncQuery(pRequest, false);
1,216,063,598✔
3248
}
3249

3250
void taosAsyncQueryImplWithReqid(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly,
235✔
3251
                                 int64_t reqid) {
3252
  if (sql == NULL || NULL == fp) {
235✔
3253
    terrno = TSDB_CODE_INVALID_PARA;
×
3254
    if (fp) {
×
3255
      fp(param, NULL, terrno);
×
3256
    }
3257

3258
    return;
8✔
3259
  }
3260

3261
  size_t sqlLen = strlen(sql);
235✔
3262
  if (sqlLen > (size_t)tsMaxSQLLength) {
235✔
3263
    tscError("conn:0x%" PRIx64 ", QID:0x%" PRIx64 ", sql string exceeds max length:%d", connId, reqid, tsMaxSQLLength);
×
3264
    terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
×
3265
    fp(param, NULL, terrno);
×
3266
    return;
×
3267
  }
3268

3269
  tscDebug("conn:0x%" PRIx64 ", taos_query execute, QID:0x%" PRIx64 ", sql:%s", connId, reqid, sql);
235✔
3270

3271
  SRequestObj* pRequest = NULL;
235✔
3272
  int32_t      code = buildRequest(connId, sql, sqlLen, param, validateOnly, &pRequest, reqid);
235✔
3273
  if (code != TSDB_CODE_SUCCESS) {
235✔
3274
    terrno = code;
8✔
3275
    fp(param, NULL, terrno);
8✔
3276
    return;
8✔
3277
  }
3278

3279
  code = connCheckAndUpateMetric(connId);
227✔
3280

3281
  if (code != TSDB_CODE_SUCCESS) {
227✔
3282
    terrno = code;
×
3283
    fp(param, NULL, terrno);
×
3284
    return;
×
3285
  }
3286

3287
  pRequest->body.queryFp = fp;
227✔
3288

3289
  doAsyncQuery(pRequest, false);
227✔
3290
}
3291

3292
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly, int8_t source) {
1,215,922,114✔
3293
  if (NULL == taos) {
1,215,922,114✔
3294
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3295
    return NULL;
×
3296
  }
3297

3298
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
1,215,922,114✔
3299
  if (NULL == param) {
1,215,950,651✔
3300
    return NULL;
×
3301
  }
3302

3303
  int32_t code = tsem_init(&param->sem, 0, 0);
1,215,950,651✔
3304
  if (TSDB_CODE_SUCCESS != code) {
1,215,913,125✔
3305
    taosMemoryFree(param);
×
3306
    return NULL;
×
3307
  }
3308

3309
  taosAsyncQueryImpl(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, source);
1,215,913,125✔
3310
  code = tsem_wait(&param->sem);
1,215,863,485✔
3311
  if (TSDB_CODE_SUCCESS != code) {
1,215,931,875✔
3312
    taosMemoryFree(param);
×
3313
    return NULL;
×
3314
  }
3315
  code = tsem_destroy(&param->sem);
1,215,931,875✔
3316
  if (TSDB_CODE_SUCCESS != code) {
1,215,939,418✔
3317
    tscError("failed to destroy semaphore since %s", tstrerror(code));
×
3318
  }
3319

3320
  SRequestObj* pRequest = NULL;
1,215,949,268✔
3321
  if (param->pRequest != NULL) {
1,215,949,268✔
3322
    param->pRequest->syncQuery = true;
1,215,946,011✔
3323
    pRequest = param->pRequest;
1,215,949,804✔
3324
    param->pRequest->inCallback = false;
1,215,946,373✔
3325
  }
3326
  taosMemoryFree(param);
1,215,945,011✔
3327

3328
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3329
  //          *(int64_t*)taos, pRequest);
3330

3331
  return pRequest;
1,215,944,337✔
3332
}
3333

3334
TAOS_RES* taosQueryImplWithReqid(TAOS* taos, const char* sql, bool validateOnly, int64_t reqid) {
235✔
3335
  if (NULL == taos) {
235✔
3336
    terrno = TSDB_CODE_TSC_DISCONNECTED;
×
3337
    return NULL;
×
3338
  }
3339

3340
  SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
235✔
3341
  if (param == NULL) {
235✔
3342
    return NULL;
×
3343
  }
3344
  int32_t code = tsem_init(&param->sem, 0, 0);
235✔
3345
  if (TSDB_CODE_SUCCESS != code) {
235✔
3346
    taosMemoryFree(param);
×
3347
    return NULL;
×
3348
  }
3349

3350
  taosAsyncQueryImplWithReqid(*(int64_t*)taos, sql, syncQueryFn, param, validateOnly, reqid);
235✔
3351
  code = tsem_wait(&param->sem);
235✔
3352
  if (TSDB_CODE_SUCCESS != code) {
235✔
3353
    taosMemoryFree(param);
×
3354
    return NULL;
×
3355
  }
3356
  SRequestObj* pRequest = NULL;
235✔
3357
  if (param->pRequest != NULL) {
235✔
3358
    param->pRequest->syncQuery = true;
227✔
3359
    pRequest = param->pRequest;
227✔
3360
  }
3361
  taosMemoryFree(param);
235✔
3362

3363
  // tscDebug("QID:0x%" PRIx64 ", taos_query end, conn:0x%" PRIx64 ", res:%p", pRequest ? pRequest->requestId : 0,
3364
  //   *(int64_t*)taos, pRequest);
3365

3366
  return pRequest;
235✔
3367
}
3368

3369
static void fetchCallback(void* pResult, void* param, int32_t code) {
304,064,708✔
3370
  SRequestObj* pRequest = (SRequestObj*)param;
304,064,708✔
3371

3372
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
304,064,708✔
3373

3374
  tscDebug("req:0x%" PRIx64 ", enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code,
304,064,705✔
3375
           tstrerror(code), pRequest->requestId);
3376

3377
  pResultInfo->pData = pResult;
304,064,705✔
3378
  pResultInfo->numOfRows = 0;
304,064,645✔
3379

3380
  if (code != TSDB_CODE_SUCCESS) {
304,064,494✔
3381
    pRequest->code = code;
×
3382
    taosMemoryFreeClear(pResultInfo->pData);
×
3383
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
3384
    return;
×
3385
  }
3386

3387
  if (pRequest->code != TSDB_CODE_SUCCESS) {
304,064,494✔
3388
    taosMemoryFreeClear(pResultInfo->pData);
×
3389
    pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pRequest->code);
×
3390
    return;
×
3391
  }
3392

3393
  pRequest->code = setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData,
309,253,649✔
3394
                                         pResultInfo->convertUcs4, pRequest->stmtBindVersion > 0);
304,064,687✔
3395
  if (pRequest->code != TSDB_CODE_SUCCESS) {
304,063,490✔
3396
    pResultInfo->numOfRows = 0;
66✔
3397
    tscError("req:0x%" PRIx64 ", fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self,
66✔
3398
             tstrerror(pRequest->code), pRequest->requestId);
3399
  } else {
3400
    tscDebug(
304,064,594✔
3401
        "req:0x%" PRIx64 ", fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
3402
        pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed, pRequest->requestId);
3403

3404
    STscObj*            pTscObj = pRequest->pTscObj;
304,065,156✔
3405
    SAppClusterSummary* pActivity = &pTscObj->pAppInfo->summary;
304,064,655✔
3406
    (void)atomic_add_fetch_64((int64_t*)&pActivity->fetchBytes, pRequest->body.resInfo.payloadLen);
304,064,634✔
3407
  }
3408

3409
  pRequest->body.fetchFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, pResultInfo->numOfRows);
304,064,636✔
3410
}
3411

3412
void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param) {
341,540,794✔
3413
  pRequest->body.fetchFp = fp;
341,540,794✔
3414
  ((SSyncQueryParam*)pRequest->body.interParam)->userParam = param;
341,540,794✔
3415

3416
  SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
341,540,815✔
3417

3418
  // this query has no results or error exists, return directly
3419
  if (taos_num_fields(pRequest) == 0 || pRequest->code != TSDB_CODE_SUCCESS) {
341,540,815✔
3420
    pResultInfo->numOfRows = 0;
×
3421
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
×
3422
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
×
3423

3424
    return;
2,194,059✔
3425
  }
3426

3427
  // all data has returned to App already, no need to try again
3428
  if (pResultInfo->completed) {
341,540,815✔
3429
    CLIENT_UPDATE_REQUEST_PHASE_IF_CHANGED(pRequest, QUERY_PHASE_FETCH_RETURNED);
74,952,188✔
3430
    // it is a local executed query, no need to do async fetch
3431
    if (QUERY_EXEC_MODE_SCHEDULE != pRequest->body.execMode) {
37,476,094✔
3432
      if (pResultInfo->localResultFetched) {
1,695,804✔
3433
        pResultInfo->numOfRows = 0;
847,902✔
3434
        pResultInfo->current = 0;
847,902✔
3435
      } else {
3436
        pResultInfo->localResultFetched = true;
847,902✔
3437
      }
3438
    } else {
3439
      pResultInfo->numOfRows = 0;
35,780,290✔
3440
    }
3441

3442
    pRequest->body.fetchFp(param, pRequest, pResultInfo->numOfRows);
37,476,094✔
3443
    return;
37,476,094✔
3444
  }
3445

3446
  SSchedulerReq req = {
304,064,721✔
3447
      .syncReq = false,
3448
      .fetchFp = fetchCallback,
3449
      .cbParam = pRequest,
3450
  };
3451

3452
  int32_t code = schedulerFetchRows(pRequest->body.queryJob, &req);
304,064,721✔
3453
  if (TSDB_CODE_SUCCESS != code) {
304,064,721✔
3454
    tscError("0x%" PRIx64 " failed to schedule fetch rows", pRequest->requestId);
×
3455
    // pRequest->body.fetchFp(param, pRequest, code);
3456
  }
3457
}
3458

3459
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
1,216,474,613✔
3460
  pRequest->inCallback = true;
1,216,474,613✔
3461

3462
  int64_t this = pRequest->self;
1,216,514,321✔
3463
  if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery &&
1,216,481,589✔
3464
      (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
27,010✔
3465
    code = TSDB_CODE_SUCCESS;
×
3466
    pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
×
3467
    if (pRequest->code == TSDB_CODE_PAR_TABLE_NOT_EXIST || pRequest->code == TSDB_CODE_TDB_TABLE_NOT_EXIST) {
×
3468
      pRequest->code = TSDB_CODE_SUCCESS;
15,718✔
3469
      if (pRequest->msgBuf != NULL && pRequest->msgBufLen > 0) {
×
3470
        pRequest->msgBuf[0] = '\0';
×
3471
      }
3472
    }
3473
  }
3474

3475
  tscDebug("QID:0x%" PRIx64 ", taos_query end, req:0x%" PRIx64 ", res:%p", pRequest->requestId, pRequest->self,
1,216,471,320✔
3476
           pRequest);
3477

3478
  if (pRequest->body.queryFp != NULL) {
1,216,471,841✔
3479
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
1,216,501,605✔
3480
  }
3481

3482
  SRequestObj* pReq = acquireRequest(this);
1,216,531,196✔
3483
  if (pReq != NULL) {
1,216,583,764✔
3484
    pReq->inCallback = false;
1,213,227,293✔
3485
    (void)releaseRequest(this);
1,213,227,845✔
3486
  }
3487
}
1,216,552,560✔
3488

3489
int32_t clientParseSql(void* param, const char* dbName, const char* sql, bool parseOnly, const char* effectiveUser,
582,025✔
3490
                       SParseSqlRes* pRes) {
3491
#ifndef TD_ENTERPRISE
3492
  return TSDB_CODE_SUCCESS;
3493
#else
3494
  return clientParseSqlImpl(param, dbName, sql, parseOnly, effectiveUser, pRes);
582,025✔
3495
#endif
3496
}
3497

3498
void updateConnAccessInfo(SConnAccessInfo* pInfo) {
1,310,624,939✔
3499
  if (pInfo == NULL) {
1,310,624,939✔
3500
    return;
×
3501
  }
3502
  int64_t ts = taosGetTimestampMs();
1,310,740,015✔
3503
  if (pInfo->startTime == 0) {
1,310,740,015✔
3504
    pInfo->startTime = ts;
94,658,156✔
3505
  }
3506
  pInfo->lastAccessTime = ts;
1,310,729,607✔
3507
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc