• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4892

20 Dec 2025 01:15PM UTC coverage: 65.571% (+0.02%) from 65.549%
#4892

push

travis-ci

web-flow
feat: support taos_connect_with func (#33952)

* feat: support taos_connect_with

* refactor: enhance connection options and add tests for taos_set_option and taos_connect_with

* fix: handle NULL keys and values in taos_connect_with options

* fix: revert TAOSWS_GIT_TAG to default value "main"

* docs: add TLS configuration options for WebSocket connections in documentation

* docs: modify zh docs and add en docs

* chore: update taos.cfg

* docs: add examples

* docs: add error handling for connection failure in example code

2 of 82 new or added lines in 3 files covered. (2.44%)

527 existing lines in 120 files now uncovered.

182859 of 278870 relevant lines covered (65.57%)

104634355.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.16
/source/client/src/clientMsgHandler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "cmdnodes.h"
21
#include "command.h"
22
#include "os.h"
23
#include "query.h"
24
#include "systable.h"
25
#include "tdatablock.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tname.h"
29
#include "tversion.h"
30

31
extern SClientHbMgr clientHbMgr;
32

33
static void setErrno(SRequestObj* pRequest, int32_t code) {
5,118,107✔
34
  pRequest->code = code;
5,118,107✔
35
  terrno = code;
5,118,107✔
36
}
5,118,107✔
37

38
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
3,970,566✔
39
  SRequestObj* pRequest = param;
3,970,566✔
40
  setErrno(pRequest, code);
3,970,566✔
41

42
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
3,970,566✔
43
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
1,286,003✔
44
      tscError("failed to remove meta data for table");
×
45
    }
46
  }
47

48
  taosMemoryFree(pMsg->pEpSet);
3,970,566✔
49
  taosMemoryFree(pMsg->pData);
3,970,566✔
50
  if (pRequest->body.queryFp != NULL) {
3,970,566✔
51
    doRequestCallback(pRequest, code);
3,970,196✔
52
  } else {
53
    if (tsem_post(&pRequest->body.rspSem) != 0) {
370✔
54
      tscError("failed to post semaphore");
×
55
    }
56
  }
57
  return code;
3,970,566✔
58
}
59

60
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,879,897✔
61
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
2,879,897✔
62
  if (NULL == pRequest) {
2,879,897✔
63
    goto EXIT;
×
64
  }
65

66
  if (code != TSDB_CODE_SUCCESS) {
2,879,897✔
67
    goto End;
5,100✔
68
  }
69

70
  STscObj* pTscObj = pRequest->pTscObj;
2,874,797✔
71

72
  if (NULL == pTscObj->pAppInfo) {
2,874,797✔
73
    code = TSDB_CODE_TSC_DISCONNECTED;
×
74
    goto End;
×
75
  }
76

77
  if (pTscObj->connType == CONN_TYPE__AUTH_TEST) {
2,874,797✔
78
    // auth test connection, no need to process connect rsp
79
    goto End;
×
80
  }
81

82
  SConnectRsp connectRsp = {0};
2,874,797✔
83
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
2,874,797✔
84
    code = TSDB_CODE_TSC_INVALID_VERSION;
×
85
    goto End;
×
86
  }
87

88
  if ((code = taosCheckVersionCompatibleFromStr(td_version, connectRsp.sVer, 3)) != 0) {
2,874,797✔
89
    tscError("version not compatible. client version:%s, server version:%s", td_version, connectRsp.sVer);
×
90
    goto End;
×
91
  }
92

93
  int32_t now = taosGetTimestampSec();
2,874,797✔
94
  int32_t delta = abs(now - connectRsp.svrTimestamp);
2,874,322✔
95
  if (delta > tsTimestampDeltaLimit) {
2,874,322✔
UNCOV
96
    code = TSDB_CODE_TIME_UNSYNCED;
×
UNCOV
97
    tscError("time diff:%ds is too big", delta);
×
98
    goto End;
×
99
  }
100

101
  if (connectRsp.epSet.numOfEps == 0) {
2,874,516✔
102
    code = TSDB_CODE_APP_ERROR;
×
103
    goto End;
×
104
  }
105

106
  int    updateEpSet = 1;
2,874,516✔
107
  SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
2,874,516✔
108
  if (connectRsp.dnodeNum == 1) {
2,874,797✔
109
    SEpSet dstEpSet = connectRsp.epSet;
2,741,922✔
110
    if (srcEpSet.numOfEps == 1) {
2,741,922✔
111
      if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
1,813,175✔
112
                            dstEpSet.eps[dstEpSet.inUse].fqdn) != 0) {
1,812,918✔
113
        tscError("failed to set default addr for rpc");
×
114
      }
115
      updateEpSet = 0;
1,812,660✔
116
    }
117
  }
118
  if (updateEpSet == 1 && !isEpsetEqual(&srcEpSet, &connectRsp.epSet)) {
2,874,007✔
119
    SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
955,270✔
120

121
    SEpSet* pOrig = &corEpSet;
955,270✔
122
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
955,270✔
123
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
955,270✔
124
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
955,270✔
125
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
126
             pNewEp->port);
127
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
955,270✔
128
  }
129

130
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
5,864,317✔
131
    tscDebug("QID:0x%" PRIx64 ", epSet.fqdn[%d]:%s port:%d, conn:0x%" PRIx64, pRequest->requestId, i,
2,990,058✔
132
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
133
  }
134

135
  pTscObj->sysInfo = connectRsp.sysInfo;
2,874,259✔
136
  pTscObj->connId = connectRsp.connId;
2,874,539✔
137
  pTscObj->acctId = connectRsp.acctId;
2,874,033✔
138
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
2,874,523✔
139
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
2,874,540✔
140

141
  // update the appInstInfo
142
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
2,874,006✔
143
  pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
2,874,282✔
144
  pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
2,873,601✔
145
  pTscObj->pAppInfo->serverCfg.enableAuditSelect = connectRsp.enableAuditSelect;
2,874,402✔
146
  pTscObj->pAppInfo->serverCfg.enableAuditInsert = connectRsp.enableAuditInsert;
2,873,487✔
147
  pTscObj->pAppInfo->serverCfg.auditLevel = connectRsp.auditLevel;
2,873,344✔
148
  tscDebug("monitor paras from connect rsp, clusterId:0x%" PRIx64 ", threshold:%d scope:%d",
2,872,824✔
149
           connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
150
  lastClusterId = connectRsp.clusterId;
2,872,824✔
151

152
  pTscObj->connType = connectRsp.connType;
2,872,824✔
153
  pTscObj->passInfo.ver = connectRsp.passVer;
2,873,752✔
154
  pTscObj->authVer = connectRsp.authVer;
2,873,224✔
155
  pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
2,872,705✔
156

157
  if (taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL) {
2,873,235✔
158
    if (taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo,
1,287,521✔
159
                    POINTER_BYTES) != 0) {
160
      tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
×
161
    } else {
162
#ifdef USE_MONITOR
163
      MonitorSlowLogData data = {0};
1,287,521✔
164
      data.clusterId = pTscObj->pAppInfo->clusterId;
1,287,521✔
165
      data.type = SLOW_LOG_READ_BEGINNIG;
1,287,521✔
166
      (void)monitorPutData2MonitorQueue(data);  // ignore
1,287,521✔
167
      monitorClientSlowQueryInit(connectRsp.clusterId);
1,287,521✔
168
      monitorClientSQLReqInit(connectRsp.clusterId);
1,287,521✔
169
#endif
170
    }
171
  }
172

173
  (void)taosThreadMutexLock(&clientHbMgr.lock);
2,873,967✔
174
  SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
2,874,797✔
175
  if (pAppHbMgr) {
2,874,797✔
176
    if (hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType) != 0) {
2,874,797✔
177
      tscError("QID:0x%" PRIx64 ", failed to register conn to hbMgr", pRequest->requestId);
×
178
    }
179
  } else {
180
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
181
    code = TSDB_CODE_TSC_DISCONNECTED;
×
182
    goto End;
×
183
  }
184
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
2,874,797✔
185

186
  tscDebug("QID:0x%" PRIx64 ", clusterId:0x%" PRIx64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
2,874,797✔
187
           pTscObj->pAppInfo->numOfConns);
188

189
End:
2,855,638✔
190
  if (code != 0) {
2,879,897✔
191
    setErrno(pRequest, code);
5,100✔
192
  }
193
  if (tsem_post(&pRequest->body.rspSem) != 0) {
2,879,897✔
194
    tscError("failed to post semaphore");
×
195
  }
196

197
  if (pRequest) {
2,879,897✔
198
    (void)releaseRequest(pRequest->self);
2,879,897✔
199
  }
200

201
EXIT:
2,807,654✔
202
  taosMemoryFree(param);
2,879,897✔
203
  taosMemoryFree(pMsg->pEpSet);
2,879,897✔
204
  taosMemoryFree(pMsg->pData);
2,879,897✔
205
  return code;
2,879,897✔
206
}
207

208
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
17,752,878✔
209
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
17,752,878✔
210
  if (pMsgSendInfo == NULL) return pMsgSendInfo;
17,753,118✔
211
  pMsgSendInfo->requestObjRefId = pRequest->self;
17,753,118✔
212
  pMsgSendInfo->requestId = pRequest->requestId;
17,753,118✔
213
  pMsgSendInfo->param = pRequest;
17,753,118✔
214
  pMsgSendInfo->msgType = pRequest->type;
17,753,118✔
215
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
17,752,815✔
216

217
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
17,752,844✔
218
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
17,752,815✔
219
  return pMsgSendInfo;
17,753,152✔
220
}
221

222
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,395,939✔
223
  // todo rsp with the vnode id list
224
  SRequestObj* pRequest = param;
1,395,939✔
225
  taosMemoryFree(pMsg->pData);
1,395,939✔
226
  taosMemoryFree(pMsg->pEpSet);
1,395,939✔
227
  if (code != TSDB_CODE_SUCCESS) {
1,395,939✔
228
    setErrno(pRequest, code);
48,302✔
229
  } else {
230
    struct SCatalog* pCatalog = NULL;
1,347,637✔
231
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,347,637✔
232
    if (TSDB_CODE_SUCCESS == code) {
1,347,637✔
233
      STscObj* pTscObj = pRequest->pTscObj;
1,347,637✔
234

235
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,347,637✔
236
                               .requestId = pRequest->requestId,
1,347,637✔
237
                               .requestObjRefId = pRequest->self,
1,347,637✔
238
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,347,637✔
239
      char             dbFName[TSDB_DB_FNAME_LEN];
1,325,689✔
240
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,347,637✔
241
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,347,637✔
242
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
243
      }
244
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,347,637✔
245
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,347,637✔
246
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
247
      }
248
    }
249
  }
250

251
  if (pRequest->body.queryFp) {
1,395,939✔
252
    doRequestCallback(pRequest, code);
1,395,939✔
253
  } else {
254
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
255
      tscError("failed to post semaphore");
×
256
    }
257
  }
258
  return code;
1,395,939✔
259
}
260

261
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,602,827✔
262
  SRequestObj* pRequest = param;
2,602,827✔
263
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
2,602,827✔
264
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
265
    SUseDbRsp usedbRsp = {0};
1,958✔
266
    if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
1,958✔
267
      tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
1,958✔
268
    }
269
    struct SCatalog* pCatalog = NULL;
1,958✔
270

271
    if (usedbRsp.vgVersion >= 0) {  // cached in local
1,958✔
272
      int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
1,958✔
273
      int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
1,958✔
274
      if (code1 != TSDB_CODE_SUCCESS) {
1,958✔
275
        tscWarn("QID:0x%" PRIx64 ", catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
×
276
                tstrerror(code1));
277
      } else {
278
        if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0) {
1,958✔
279
          tscError("QID:0x%" PRIx64 ", catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
×
280
                   usedbRsp.uid);
281
        }
282
      }
283
    }
284
    tFreeSUsedbRsp(&usedbRsp);
1,958✔
285
  }
286

287
  if (code != TSDB_CODE_SUCCESS) {
2,602,827✔
288
    taosMemoryFree(pMsg->pData);
4,506✔
289
    taosMemoryFree(pMsg->pEpSet);
4,506✔
290
    setErrno(pRequest, code);
4,506✔
291

292
    if (pRequest->body.queryFp != NULL) {
4,506✔
293
      doRequestCallback(pRequest, pRequest->code);
4,506✔
294

295
    } else {
296
      if (tsem_post(&pRequest->body.rspSem) != 0) {
×
297
        tscError("failed to post semaphore");
×
298
      }
299
    }
300

301
    return code;
4,506✔
302
  }
303

304
  SUseDbRsp usedbRsp = {0};
2,598,321✔
305
  if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
2,598,321✔
306
    tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
×
307
  }
308

309
  if (strlen(usedbRsp.db) == 0) {
2,598,321✔
310
    taosMemoryFree(pMsg->pData);
×
311
    taosMemoryFree(pMsg->pEpSet);
×
312

313
    if (usedbRsp.errCode != 0) {
×
314
      return usedbRsp.errCode;
×
315
    } else {
316
      return TSDB_CODE_APP_ERROR;
×
317
    }
318
  }
319

320
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
2,598,321✔
321
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
5,058,225✔
322
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
2,459,870✔
323
    if (pInfo == NULL) {
2,459,870✔
324
      continue;
×
325
    }
326
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
2,459,870✔
327
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
5,184,669✔
328
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
2,724,765✔
329
    }
330
  }
331

332
  SName name = {0};
2,598,355✔
333
  if (tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
2,598,355✔
334
    tscError("QID:0x%" PRIx64 ", failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
×
335
  }
336

337
  SUseDbOutput output = {0};
2,598,287✔
338
  code = queryBuildUseDbOutput(&output, &usedbRsp);
2,598,287✔
339
  if (code != 0) {
2,598,321✔
340
    terrno = code;
×
341
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
×
342

343
    tscError("QID:0x%" PRIx64 ", failed to build use db output since %s", pRequest->requestId, terrstr());
×
344
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
2,598,321✔
345
    struct SCatalog* pCatalog = NULL;
1,139,693✔
346

347
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,139,693✔
348
    if (code1 != TSDB_CODE_SUCCESS) {
1,139,693✔
349
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
×
350
              tstrerror(code1));
351
    } else {
352
      if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0) {
1,139,693✔
353
        tscError("QID:0x%" PRIx64 ", failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
×
354
                 output.dbId);
355
      }
356
      output.dbVgroup = NULL;
1,139,693✔
357
    }
358
  }
359

360
  taosMemoryFreeClear(output.dbVgroup);
2,598,321✔
361
  tFreeSUsedbRsp(&usedbRsp);
2,598,287✔
362

363
  char db[TSDB_DB_NAME_LEN] = {0};
2,598,321✔
364
  if (tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
2,598,321✔
365
    tscError("QID:0x%" PRIx64 ", failed to get db name since %s", pRequest->requestId, tstrerror(code));
×
366
  }
367

368
  setConnectionDB(pRequest->pTscObj, db);
2,598,321✔
369

370
  taosMemoryFree(pMsg->pData);
2,598,321✔
371
  taosMemoryFree(pMsg->pEpSet);
2,598,287✔
372

373
  if (pRequest->body.queryFp != NULL) {
2,598,287✔
374
    doRequestCallback(pRequest, pRequest->code);
2,598,287✔
375
  } else {
376
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
377
      tscError("failed to post semaphore");
×
378
    }
379
  }
380
  return 0;
2,598,321✔
381
}
382

383
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,006,427✔
384
  if (pMsg == NULL) {
2,006,427✔
385
    tscError("processCreateSTableRsp: invalid input param, pMsg is NULL");
×
386
    return TSDB_CODE_TSC_INVALID_INPUT;
×
387
  }
388
  if (param == NULL) {
2,006,427✔
389
    taosMemoryFree(pMsg->pEpSet);
×
390
    taosMemoryFree(pMsg->pData);
×
391
    tscError("processCreateSTableRsp: invalid input param, param is NULL");
×
392
    return TSDB_CODE_TSC_INVALID_INPUT;
×
393
  }
394

395
  SRequestObj* pRequest = param;
2,006,427✔
396

397
  if (code != TSDB_CODE_SUCCESS) {
2,006,427✔
398
    setErrno(pRequest, code);
11,765✔
399
  } else {
400
    SMCreateStbRsp createRsp = {0};
1,994,662✔
401
    SDecoder       coder = {0};
1,994,662✔
402
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
1,994,662✔
403
    if (pMsg->len > 0) {
1,994,662✔
404
      code = tDecodeSMCreateStbRsp(&coder, &createRsp);  // pMsg->len == 0
1,983,588✔
405
      if (code != TSDB_CODE_SUCCESS) {
1,983,588✔
406
        setErrno(pRequest, code);
×
407
      }
408
    }
409
    tDecoderClear(&coder);
1,994,662✔
410

411
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
1,994,662✔
412
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
1,994,662✔
413
  }
414

415
  taosMemoryFree(pMsg->pEpSet);
2,006,427✔
416
  taosMemoryFree(pMsg->pData);
2,006,427✔
417

418
  if (pRequest->body.queryFp != NULL) {
2,006,427✔
419
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,647,988✔
420

421
    if (code == TSDB_CODE_SUCCESS) {
1,647,988✔
422
      SCatalog* pCatalog = NULL;
1,642,984✔
423
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,642,984✔
424
      if (pRes->res != NULL) {
1,642,984✔
425
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
1,635,525✔
426
      }
427

428
      if (ret != TSDB_CODE_SUCCESS) {
1,642,984✔
429
        code = ret;
×
430
      }
431
    }
432

433
    doRequestCallback(pRequest, code);
1,647,988✔
434
  } else {
435
    if (tsem_post(&pRequest->body.rspSem) != 0) {
358,439✔
436
      tscError("failed to post semaphore");
×
437
    }
438
  }
439
  return code;
2,006,427✔
440
}
441

442
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,246,989✔
443
  SRequestObj* pRequest = param;
1,246,989✔
444
  if (code != TSDB_CODE_SUCCESS) {
1,246,989✔
445
    setErrno(pRequest, code);
11,750✔
446
  } else {
447
    SDropDbRsp dropdbRsp = {0};
1,235,239✔
448
    if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0) {
1,235,239✔
449
      tscError("QID:0x%" PRIx64 ", deserialize SDropDbRsp failed", pRequest->requestId);
×
450
    }
451
    struct SCatalog* pCatalog = NULL;
1,235,239✔
452
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,235,239✔
453
    if (TSDB_CODE_SUCCESS == code) {
1,235,239✔
454
      if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0) {
1,235,239✔
455
        tscError("QID:0x%" PRIx64 ", failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
×
456
      }
457
      STscObj* pTscObj = pRequest->pTscObj;
1,235,239✔
458

459
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,235,239✔
460
                               .requestId = pRequest->requestId,
1,235,239✔
461
                               .requestObjRefId = pRequest->self,
1,235,239✔
462
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,235,239✔
463
      char             dbFName[TSDB_DB_FNAME_LEN] = {0};
1,235,239✔
464
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,235,239✔
465
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
1,235,239✔
466
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
467
      }
468
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,235,239✔
469
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,235,239✔
470
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
471
      }
472
    }
473
  }
474

475
  taosMemoryFree(pMsg->pData);
1,246,989✔
476
  taosMemoryFree(pMsg->pEpSet);
1,246,989✔
477

478
  if (pRequest->body.queryFp != NULL) {
1,246,989✔
479
    doRequestCallback(pRequest, code);
1,246,989✔
480
  } else {
481
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
482
      tscError("failed to post semaphore");
×
483
    }
484
  }
485
  return code;
1,246,989✔
486
}
487

488
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
6,425,410✔
489
  SRequestObj* pRequest = param;
6,425,410✔
490
  if (code != TSDB_CODE_SUCCESS) {
6,425,410✔
491
    setErrno(pRequest, code);
1,055,932✔
492
  } else {
493
    SMAlterStbRsp alterRsp = {0};
5,369,478✔
494
    SDecoder      coder = {0};
5,369,478✔
495
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
5,369,478✔
496
    if (pMsg->len > 0) {
5,369,478✔
497
      code = tDecodeSMAlterStbRsp(&coder, &alterRsp);  // pMsg->len == 0
5,346,095✔
498
      if (code != TSDB_CODE_SUCCESS) {
5,346,095✔
499
        setErrno(pRequest, code);
×
500
      }
501
    }
502
    tDecoderClear(&coder);
5,369,478✔
503

504
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
5,369,478✔
505
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
5,369,478✔
506
  }
507

508
  taosMemoryFree(pMsg->pData);
6,425,410✔
509
  taosMemoryFree(pMsg->pEpSet);
6,425,410✔
510

511
  if (pRequest->body.queryFp != NULL) {
6,425,410✔
512
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
6,425,410✔
513

514
    if (code == TSDB_CODE_SUCCESS) {
6,425,410✔
515
      SCatalog* pCatalog = NULL;
5,369,478✔
516
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
5,369,478✔
517
      if (pRes->res != NULL) {
5,369,478✔
518
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
5,346,095✔
519
      }
520

521
      if (ret != TSDB_CODE_SUCCESS) {
5,369,478✔
522
        code = ret;
×
523
      }
524
    }
525

526
    doRequestCallback(pRequest, code);
6,425,410✔
527
  } else {
528
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
529
      tscError("failed to post semaphore");
×
530
    }
531
  }
532
  return code;
6,425,410✔
533
}
534

535
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
45,172✔
536
  int32_t      code = 0;
45,172✔
537
  int32_t      line = 0;
45,172✔
538
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
45,172✔
539
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
45,172✔
540
  pBlock->info.hasVarCol = true;
45,172✔
541

542
  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
45,172✔
543
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
45,172✔
544
  SColumnInfoData infoData = {0};
45,172✔
545
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
45,172✔
546
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
45,172✔
547
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
90,344✔
548

549
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
45,172✔
550
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
45,172✔
551
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
90,344✔
552

553
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
45,172✔
554
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
45,172✔
555
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
90,344✔
556

557
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
45,172✔
558
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD4_LEN;
45,172✔
559
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
90,344✔
560

561
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
45,172✔
562
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD5_LEN;
45,172✔
563
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
90,344✔
564

565
  int32_t numOfCfg = taosArrayGetSize(pVars);
45,172✔
566
  code = blockDataEnsureCapacity(pBlock, numOfCfg);
45,172✔
567
  TSDB_CHECK_CODE(code, line, END);
45,172✔
568

569
  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
4,804,924✔
570
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
4,759,752✔
571
    TSDB_CHECK_NULL(pInfo, code, line, END, terrno);
4,759,752✔
572

573
    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
4,759,752✔
574
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
4,759,752✔
575
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,759,752✔
576
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,759,752✔
577
    code = colDataSetVal(pColInfo, i, name, false);
4,759,752✔
578
    TSDB_CHECK_CODE(code, line, END);
4,759,752✔
579

580
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
4,759,752✔
581
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
4,759,752✔
582
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,759,752✔
583
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,759,752✔
584
    code = colDataSetVal(pColInfo, i, value, false);
4,759,752✔
585
    TSDB_CHECK_CODE(code, line, END);
4,759,752✔
586

587
    char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
4,759,752✔
588
    STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
4,759,752✔
589
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,759,752✔
590
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,759,752✔
591
    code = colDataSetVal(pColInfo, i, scope, false);
4,759,752✔
592
    TSDB_CHECK_CODE(code, line, END);
4,759,752✔
593

594
    char category[TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE] = {0};
4,759,752✔
595
    STR_WITH_MAXSIZE_TO_VARSTR(category, pInfo->category, TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE);
4,759,752✔
596
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,759,752✔
597
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,759,752✔
598
    code = colDataSetVal(pColInfo, i, category, false);
4,759,752✔
599
    TSDB_CHECK_CODE(code, line, END);
4,759,752✔
600

601
    char info[TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE] = {0};
4,759,752✔
602
    STR_WITH_MAXSIZE_TO_VARSTR(info, pInfo->info, TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE);
4,759,752✔
603
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,759,752✔
604
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,759,752✔
605
    code = colDataSetVal(pColInfo, i, info, false);
4,759,752✔
606
    TSDB_CHECK_CODE(code, line, END);
4,759,752✔
607
  }
608

609
  pBlock->info.rows = numOfCfg;
45,172✔
610

611
  *block = pBlock;
45,172✔
612
  return code;
45,172✔
613

614
END:
×
615
  taosArrayDestroy(pBlock->pDataBlock);
×
616
  taosMemoryFree(pBlock);
×
617
  return code;
×
618
}
619

620
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
45,172✔
621
  SSDataBlock* pBlock = NULL;
45,172✔
622
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
45,172✔
623
  if (code) {
45,172✔
624
    return code;
×
625
  }
626

627
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
45,172✔
628
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
45,172✔
629
  *pRsp = taosMemoryCalloc(1, rspSize);
45,172✔
630
  if (NULL == *pRsp) {
45,172✔
631
    code = terrno;
×
632
    goto _exit;
×
633
  }
634

635
  (*pRsp)->useconds = 0;
45,172✔
636
  (*pRsp)->completed = 1;
45,172✔
637
  (*pRsp)->precision = 0;
45,172✔
638
  (*pRsp)->compressed = 0;
45,172✔
639

640
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
45,172✔
641
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
45,172✔
642

643
  int32_t len = 0;
45,172✔
644
  if ((*pRsp)->numOfRows > 0) {
45,172✔
645
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
44,352✔
646
    if (len < 0) {
44,352✔
647
      uError("buildShowVariablesRsp error, len:%d", len);
×
648
      code = terrno;
×
649
      goto _exit;
×
650
    }
651
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
44,352✔
652

653
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
44,352✔
654
    (*pRsp)->payloadLen = htonl(payloadLen);
44,352✔
655
    (*pRsp)->compLen = htonl(payloadLen);
44,352✔
656

657
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
44,352✔
658
      uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
659
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
660
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
661
      goto _exit;
×
662
    }
663
  }
664

665
  blockDataDestroy(pBlock);
45,172✔
666
  pBlock = NULL;
45,172✔
667

668
  return TSDB_CODE_SUCCESS;
45,172✔
669
_exit:
×
670
  if (*pRsp) {
×
671
    taosMemoryFree(*pRsp);
×
672
    *pRsp = NULL;
×
673
  }
674
  if (pBlock) {
×
675
    blockDataDestroy(pBlock);
×
676
    pBlock = NULL;
×
677
  }
678
  return code;
×
679
}
680

681
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
45,172✔
682
  SRequestObj* pRequest = param;
45,172✔
683
  if (code != TSDB_CODE_SUCCESS) {
45,172✔
684
    setErrno(pRequest, code);
×
685
  } else {
686
    SShowVariablesRsp  rsp = {0};
45,172✔
687
    SRetrieveTableRsp* pRes = NULL;
45,172✔
688
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
45,172✔
689
    if (TSDB_CODE_SUCCESS == code) {
45,172✔
690
      code = buildShowVariablesRsp(rsp.variables, &pRes);
45,172✔
691
    }
692
    if (TSDB_CODE_SUCCESS == code) {
45,172✔
693
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
45,172✔
694
    }
695

696
    if (code != 0) {
45,172✔
697
      pRequest->body.resInfo.pRspMsg = NULL;
×
698
      taosMemoryFree(pRes);
×
699
    }
700
    tFreeSShowVariablesRsp(&rsp);
45,172✔
701
  }
702

703
  taosMemoryFree(pMsg->pData);
45,172✔
704
  taosMemoryFree(pMsg->pEpSet);
45,172✔
705

706
  if (pRequest->body.queryFp != NULL) {
45,172✔
707
    doRequestCallback(pRequest, code);
45,172✔
708
  } else {
709
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
710
      tscError("failed to post semaphore");
×
711
    }
712
  }
713
  return code;
45,172✔
714
}
715

716
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
49,112✔
717
  int32_t      code = 0;
49,112✔
718
  int32_t      line = 0;
49,112✔
719
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
49,112✔
720
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
49,112✔
721
  pBlock->info.hasVarCol = true;
49,112✔
722

723
  pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
49,112✔
724
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
49,112✔
725
  SColumnInfoData infoData = {0};
49,112✔
726
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
49,112✔
727
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
49,112✔
728
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
98,224✔
729

730
  infoData.info.type = TSDB_DATA_TYPE_INT;
49,112✔
731
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
49,112✔
732
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
98,224✔
733

734
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
49,112✔
735
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
49,112✔
736
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
98,224✔
737

738
  code = blockDataEnsureCapacity(pBlock, 1);
49,112✔
739
  TSDB_CHECK_CODE(code, line, END);
49,112✔
740

741
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
49,112✔
742
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
49,112✔
743
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
49,112✔
744
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
49,112✔
745
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
49,112✔
746
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
49,112✔
747

748
  char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
49,112✔
749
  char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
49,112✔
750
  if (pRsp->bAccepted) {
49,112✔
751
    STR_TO_VARSTR(result, "accepted");
49,112✔
752
    code = colDataSetVal(pResultCol, 0, result, false);
49,112✔
753
    TSDB_CHECK_CODE(code, line, END);
49,112✔
754
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
49,112✔
755
    TSDB_CHECK_CODE(code, line, END);
49,112✔
756
    STR_TO_VARSTR(reason, "success");
49,112✔
757
    code = colDataSetVal(pReasonCol, 0, reason, false);
49,112✔
758
    TSDB_CHECK_CODE(code, line, END);
49,112✔
759
  } else {
760
    STR_TO_VARSTR(result, "rejected");
×
761
    code = colDataSetVal(pResultCol, 0, result, false);
×
762
    TSDB_CHECK_CODE(code, line, END);
×
763
    colDataSetNULL(pIdCol, 0);
764
    STR_TO_VARSTR(reason, "compaction is ongoing");
×
765
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
766
    TSDB_CHECK_CODE(code, line, END);
×
767
  }
768
  pBlock->info.rows = 1;
49,112✔
769

770
  *block = pBlock;
49,112✔
771

772
  return TSDB_CODE_SUCCESS;
49,112✔
773
END:
×
774
  taosMemoryFree(pBlock);
×
775
  taosArrayDestroy(pBlock->pDataBlock);
×
776
  return code;
×
777
}
778

779
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
49,112✔
780
  SSDataBlock* pBlock = NULL;
49,112✔
781
  int32_t      code = buildCompactDbBlock(pCompactDb, &pBlock);
49,112✔
782
  if (code) {
49,112✔
783
    return code;
×
784
  }
785

786
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
49,112✔
787
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
49,112✔
788
  *pRsp = taosMemoryCalloc(1, rspSize);
49,112✔
789
  if (NULL == *pRsp) {
49,112✔
790
    code = terrno;
×
791
    goto _exit;
×
792
  }
793

794
  (*pRsp)->useconds = 0;
49,112✔
795
  (*pRsp)->completed = 1;
49,112✔
796
  (*pRsp)->precision = 0;
49,112✔
797
  (*pRsp)->compressed = 0;
49,112✔
798
  (*pRsp)->compLen = 0;
49,112✔
799
  (*pRsp)->payloadLen = 0;
49,112✔
800
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
49,112✔
801
  (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
49,112✔
802

803
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
49,112✔
804
  if (len < 0) {
49,112✔
805
    uError("buildRetriveTableRspForCompactDb error, len:%d", len);
×
806
    code = terrno;
×
807
    goto _exit;
×
808
  }
809
  blockDataDestroy(pBlock);
49,112✔
810

811
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
49,112✔
812

813
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
49,112✔
814
  (*pRsp)->payloadLen = htonl(payloadLen);
49,112✔
815
  (*pRsp)->compLen = htonl(payloadLen);
49,112✔
816

817
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
49,112✔
818
    uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
819
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
820
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
821
    goto _exit;
×
822
  }
823

824
  return TSDB_CODE_SUCCESS;
49,112✔
825
_exit:
×
826
  if (*pRsp) {
×
827
    taosMemoryFree(*pRsp);
×
828
    *pRsp = NULL;
×
829
  }
830
  if (pBlock) {
×
831
    blockDataDestroy(pBlock);
×
832
    pBlock = NULL;
×
833
  }
834
  return code;
×
835
}
836

837
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
34,934✔
838
  SRequestObj* pRequest = param;
34,934✔
839
  if (code != TSDB_CODE_SUCCESS) {
34,934✔
840
    setErrno(pRequest, code);
2,760✔
841
  } else {
842
    SCompactDbRsp      rsp = {0};
32,174✔
843
    SRetrieveTableRsp* pRes = NULL;
32,174✔
844
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
32,174✔
845
    if (TSDB_CODE_SUCCESS == code) {
32,174✔
846
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
32,174✔
847
    }
848
    if (TSDB_CODE_SUCCESS == code) {
32,174✔
849
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
32,174✔
850
    }
851

852
    if (code != 0) {
32,174✔
853
      pRequest->body.resInfo.pRspMsg = NULL;
×
854
      taosMemoryFree(pRes);
×
855
    }
856
  }
857

858
  taosMemoryFree(pMsg->pData);
34,934✔
859
  taosMemoryFree(pMsg->pEpSet);
34,934✔
860

861
  if (pRequest->body.queryFp != NULL) {
34,934✔
862
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
34,934✔
863
  } else {
864
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
865
      tscError("failed to post semaphore");
×
866
    }
867
  }
868
  return code;
34,934✔
869
}
870

871
static int32_t buildScanDbBlock(SScanDbRsp* pRsp, SSDataBlock** block) {
524✔
872
  int32_t      code = 0;
524✔
873
  int32_t      line = 0;
524✔
874
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
524✔
875
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
524✔
876
  pBlock->info.hasVarCol = true;
524✔
877

878
  pBlock->pDataBlock = taosArrayInit(SCAN_DB_RESULT_COLS, sizeof(SColumnInfoData));
524✔
879
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
524✔
880
  SColumnInfoData infoData = {0};
524✔
881
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
524✔
882
  infoData.info.bytes = SCAN_DB_RESULT_FIELD1_LEN;
524✔
883
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
1,048✔
884

885
  infoData.info.type = TSDB_DATA_TYPE_INT;
524✔
886
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
524✔
887
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
1,048✔
888

889
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
524✔
890
  infoData.info.bytes = SCAN_DB_RESULT_FIELD3_LEN;
524✔
891
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
1,048✔
892

893
  code = blockDataEnsureCapacity(pBlock, 1);
524✔
894
  TSDB_CHECK_CODE(code, line, END);
524✔
895

896
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
524✔
897
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
524✔
898
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
524✔
899
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
524✔
900
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
524✔
901
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
524✔
902

903
  char result[SCAN_DB_RESULT_FIELD1_LEN] = {0};
524✔
904
  char reason[SCAN_DB_RESULT_FIELD3_LEN] = {0};
524✔
905
  if (pRsp->bAccepted) {
524✔
906
    STR_TO_VARSTR(result, "accepted");
524✔
907
    code = colDataSetVal(pResultCol, 0, result, false);
524✔
908
    TSDB_CHECK_CODE(code, line, END);
524✔
909
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->scanId, false);
524✔
910
    TSDB_CHECK_CODE(code, line, END);
524✔
911
    STR_TO_VARSTR(reason, "success");
524✔
912
    code = colDataSetVal(pReasonCol, 0, reason, false);
524✔
913
    TSDB_CHECK_CODE(code, line, END);
524✔
914
  } else {
915
    STR_TO_VARSTR(result, "rejected");
×
916
    code = colDataSetVal(pResultCol, 0, result, false);
×
917
    TSDB_CHECK_CODE(code, line, END);
×
918
    colDataSetNULL(pIdCol, 0);
919
    STR_TO_VARSTR(reason, "scan is ongoing");
×
920
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
921
    TSDB_CHECK_CODE(code, line, END);
×
922
  }
923
  pBlock->info.rows = 1;
524✔
924

925
  *block = pBlock;
524✔
926

927
  return TSDB_CODE_SUCCESS;
524✔
928
END:
×
929
  taosMemoryFree(pBlock);
×
930
  taosArrayDestroy(pBlock->pDataBlock);
×
931
  return code;
×
932
}
933

934
static int32_t buildRetriveTableRspForScanDb(SScanDbRsp* pScanDb, SRetrieveTableRsp** pRsp) {
524✔
935
  SSDataBlock* pBlock = NULL;
524✔
936
  int32_t      code = buildScanDbBlock(pScanDb, &pBlock);
524✔
937
  if (code) {
524✔
938
    return code;
×
939
  }
940

941
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
524✔
942
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
524✔
943
  *pRsp = taosMemoryCalloc(1, rspSize);
524✔
944
  if (NULL == *pRsp) {
524✔
945
    code = terrno;
×
946
    goto _exit;
×
947
  }
948

949
  (*pRsp)->useconds = 0;
524✔
950
  (*pRsp)->completed = 1;
524✔
951
  (*pRsp)->precision = 0;
524✔
952
  (*pRsp)->compressed = 0;
524✔
953
  (*pRsp)->compLen = 0;
524✔
954
  (*pRsp)->payloadLen = 0;
524✔
955
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
524✔
956
  (*pRsp)->numOfCols = htonl(SCAN_DB_RESULT_COLS);
524✔
957

958
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SCAN_DB_RESULT_COLS);
524✔
959
  if (len < 0) {
524✔
960
    uError("%s error, len:%d", __func__, len);
×
961
    code = terrno;
×
962
    goto _exit;
×
963
  }
964
  blockDataDestroy(pBlock);
524✔
965

966
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
524✔
967

968
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
524✔
969
  (*pRsp)->payloadLen = htonl(payloadLen);
524✔
970
  (*pRsp)->compLen = htonl(payloadLen);
524✔
971

972
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
524✔
973
    uError("%s error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, __func__, len,
×
974
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
975
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
976
    goto _exit;
×
977
  }
978

979
  return TSDB_CODE_SUCCESS;
524✔
980
_exit:
×
981
  if (*pRsp) {
×
982
    taosMemoryFree(*pRsp);
×
983
    *pRsp = NULL;
×
984
  }
985
  if (pBlock) {
×
986
    blockDataDestroy(pBlock);
×
987
    pBlock = NULL;
×
988
  }
989
  return code;
×
990
}
991

992
static int32_t processScanDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
786✔
993
  SRequestObj* pRequest = param;
786✔
994
  if (code != TSDB_CODE_SUCCESS) {
786✔
995
    setErrno(pRequest, code);
262✔
996
  } else {
997
    SScanDbRsp         rsp = {0};
524✔
998
    SRetrieveTableRsp* pRes = NULL;
524✔
999
    code = tDeserializeSScanDbRsp(pMsg->pData, pMsg->len, &rsp);
524✔
1000
    if (TSDB_CODE_SUCCESS == code) {
524✔
1001
      code = buildRetriveTableRspForScanDb(&rsp, &pRes);
524✔
1002
    }
1003
    if (TSDB_CODE_SUCCESS == code) {
524✔
1004
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
524✔
1005
    }
1006

1007
    if (code != 0) {
524✔
1008
      pRequest->body.resInfo.pRspMsg = NULL;
×
1009
      taosMemoryFree(pRes);
×
1010
    }
1011
  }
1012

1013
  taosMemoryFree(pMsg->pData);
786✔
1014
  taosMemoryFree(pMsg->pEpSet);
786✔
1015

1016
  if (pRequest->body.queryFp != NULL) {
786✔
1017
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
786✔
1018
  } else {
1019
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1020
      tscError("failed to post semaphore");
×
1021
    }
1022
  }
1023
  return code;
786✔
1024
}
1025

1026
int32_t processTrimDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
24,102✔
1027
  SRequestObj* pRequest = param;
24,102✔
1028
  if (code != TSDB_CODE_SUCCESS) {
24,102✔
1029
    setErrno(pRequest, code);
7,164✔
1030
  } else {
1031
    STrimDbRsp         rsp = {0};
16,938✔
1032
    SRetrieveTableRsp* pRes = NULL;
16,938✔
1033
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, (SCompactDbRsp*)&rsp);
16,938✔
1034
    if (TSDB_CODE_SUCCESS == code) {
16,938✔
1035
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
16,938✔
1036
    }
1037
    if (TSDB_CODE_SUCCESS == code) {
16,938✔
1038
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
16,938✔
1039
    }
1040

1041
    if (code != 0) {
16,938✔
1042
      pRequest->body.resInfo.pRspMsg = NULL;
×
1043
      taosMemoryFree(pRes);
×
1044
    }
1045
  }
1046

1047
  taosMemoryFree(pMsg->pData);
24,102✔
1048
  taosMemoryFree(pMsg->pEpSet);
24,102✔
1049

1050
  if (pRequest->body.queryFp != NULL) {
24,102✔
1051
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
24,102✔
1052
  } else {
1053
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1054
      tscError("failed to post semaphore");
×
1055
    }
1056
  }
1057
  return code;
24,102✔
1058
}
1059

1060
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
20,632,486✔
1061
  switch (msgType) {
20,632,486✔
1062
    case TDMT_MND_CONNECT:
2,879,897✔
1063
      return processConnectRsp;
2,879,897✔
1064
    case TDMT_MND_CREATE_DB:
1,395,939✔
1065
      return processCreateDbRsp;
1,395,939✔
1066
    case TDMT_MND_USE_DB:
2,602,553✔
1067
      return processUseDbRsp;
2,602,553✔
1068
    case TDMT_MND_CREATE_STB:
2,006,427✔
1069
      return processCreateSTableRsp;
2,006,427✔
1070
    case TDMT_MND_DROP_DB:
1,246,989✔
1071
      return processDropDbRsp;
1,246,989✔
1072
    case TDMT_MND_ALTER_STB:
6,425,410✔
1073
      return processAlterStbRsp;
6,425,410✔
1074
    case TDMT_MND_SHOW_VARIABLES:
45,172✔
1075
      return processShowVariablesRsp;
45,172✔
1076
    case TDMT_MND_COMPACT_DB:
34,934✔
1077
      return processCompactDbRsp;
34,934✔
1078
    case TDMT_MND_TRIM_DB:
24,102✔
1079
      return processTrimDbRsp;
24,102✔
1080
    case TDMT_MND_SCAN_DB:
786✔
1081
      return processScanDbRsp;
786✔
1082
    default:
3,970,277✔
1083
      return genericRspCallback;
3,970,277✔
1084
  }
1085
}
1086

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc