• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4897

25 Dec 2025 10:17AM UTC coverage: 65.717% (-0.2%) from 65.929%
#4897

push

travis-ci

web-flow
fix: [6622889291] Fix invalid rowSize. (#34043)

186011 of 283047 relevant lines covered (65.72%)

113853896.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.24
/source/client/src/clientMsgHandler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "cmdnodes.h"
21
#include "command.h"
22
#include "os.h"
23
#include "query.h"
24
#include "systable.h"
25
#include "tdatablock.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tmsg.h"
29
#include "tname.h"
30
#include "tversion.h"
31

32
extern SClientHbMgr clientHbMgr;
33

34
static void setErrno(SRequestObj* pRequest, int32_t code) {
5,043,327✔
35
  pRequest->code = code;
5,043,327✔
36
  terrno = code;
5,043,327✔
37
}
5,043,327✔
38

39
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
3,894,352✔
40
  SRequestObj* pRequest = param;
3,894,352✔
41
  setErrno(pRequest, code);
3,894,352✔
42

43
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
3,894,352✔
44
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
1,257,032✔
45
      tscError("failed to remove meta data for table");
×
46
    }
47
  }
48

49
  taosMemoryFree(pMsg->pEpSet);
3,894,352✔
50
  taosMemoryFree(pMsg->pData);
3,894,352✔
51
  if (pRequest->body.queryFp != NULL) {
3,894,352✔
52
    doRequestCallback(pRequest, code);
3,894,138✔
53
  } else {
54
    if (tsem_post(&pRequest->body.rspSem) != 0) {
214✔
55
      tscError("failed to post semaphore");
×
56
    }
57
  }
58
  return code;
3,894,352✔
59
}
60

61
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,711,170✔
62
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
2,711,170✔
63
  if (NULL == pRequest) {
2,711,170✔
64
    goto EXIT;
×
65
  }
66

67
  if (code != TSDB_CODE_SUCCESS) {
2,711,170✔
68
    goto End;
13,682✔
69
  }
70

71
  STscObj* pTscObj = pRequest->pTscObj;
2,697,488✔
72

73
  if (NULL == pTscObj->pAppInfo) {
2,697,488✔
74
    code = TSDB_CODE_TSC_DISCONNECTED;
×
75
    goto End;
×
76
  }
77

78
  if (pTscObj->connType == CONN_TYPE__AUTH_TEST) {
2,697,488✔
79
    // auth test connection, no need to process connect rsp
80
    goto End;
×
81
  }
82

83
  SConnectRsp connectRsp = {0};
2,697,488✔
84
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
2,697,488✔
85
    code = TSDB_CODE_TSC_INVALID_VERSION;
×
86
    goto End;
×
87
  }
88

89
  if ((code = taosCheckVersionCompatibleFromStr(td_version, connectRsp.sVer, 3)) != 0) {
2,696,499✔
90
    tscError("version not compatible. client version:%s, server version:%s", td_version, connectRsp.sVer);
×
91
    goto End;
×
92
  }
93

94
  int32_t now = taosGetTimestampSec();
2,696,196✔
95
  int32_t delta = abs(now - connectRsp.svrTimestamp);
2,695,615✔
96
  if (delta > tsTimestampDeltaLimit) {
2,695,615✔
97
    code = TSDB_CODE_TIME_UNSYNCED;
×
98
    tscError("time diff:%ds is too big", delta);
×
99
    goto End;
×
100
  }
101

102
  if (connectRsp.epSet.numOfEps == 0) {
2,696,104✔
103
    code = TSDB_CODE_APP_ERROR;
×
104
    goto End;
×
105
  }
106

107
  int    updateEpSet = 1;
2,696,104✔
108
  SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
2,696,104✔
109
  if (connectRsp.dnodeNum == 1) {
2,696,979✔
110
    SEpSet dstEpSet = connectRsp.epSet;
2,565,538✔
111
    if (srcEpSet.numOfEps == 1) {
2,565,538✔
112
      if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
1,733,955✔
113
                            dstEpSet.eps[dstEpSet.inUse].fqdn) != 0) {
1,733,692✔
114
        tscError("failed to set default addr for rpc");
×
115
      }
116
      updateEpSet = 0;
1,733,805✔
117
    }
118
  }
119
  if (updateEpSet == 1 && !isEpsetEqual(&srcEpSet, &connectRsp.epSet)) {
2,696,634✔
120
    SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
855,077✔
121

122
    SEpSet* pOrig = &corEpSet;
855,121✔
123
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
855,121✔
124
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
855,121✔
125
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
855,121✔
126
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
127
             pNewEp->port);
128
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
855,121✔
129
  }
130

131
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
5,501,850✔
132
    tscDebug("QID:0x%" PRIx64 ", epSet.fqdn[%d]:%s port:%d, conn:0x%" PRIx64, pRequest->requestId, i,
2,805,903✔
133
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
134
  }
135

136
  pTscObj->sysInfo = connectRsp.sysInfo;
2,695,947✔
137
  pTscObj->connId = connectRsp.connId;
2,697,217✔
138
  pTscObj->acctId = connectRsp.acctId;
2,697,455✔
139
  if (pTscObj->user[0] == 0) {
2,696,962✔
140
    tstrncpy(pTscObj->user, connectRsp.user, tListLen(pTscObj->user));
×
141
    tstrncpy(pTscObj->tokenName, connectRsp.tokenName, tListLen(pTscObj->tokenName));
×
142
  } else {
143
    pTscObj->tokenName[0] = 0;
2,696,702✔
144
  }
145
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
2,696,687✔
146
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
2,696,721✔
147

148
  // update the appInstInfo
149
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
2,696,724✔
150
  pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
2,697,195✔
151
  pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
2,696,729✔
152
  pTscObj->pAppInfo->serverCfg.enableAuditSelect = connectRsp.enableAuditSelect;
2,696,472✔
153
  pTscObj->pAppInfo->serverCfg.enableAuditInsert = connectRsp.enableAuditInsert;
2,696,446✔
154
  pTscObj->pAppInfo->serverCfg.auditLevel = connectRsp.auditLevel;
2,696,451✔
155
  tscDebug("monitor paras from connect rsp, clusterId:0x%" PRIx64 ", threshold:%d scope:%d",
2,695,460✔
156
           connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
157
  lastClusterId = connectRsp.clusterId;
2,695,460✔
158

159
  pTscObj->connType = connectRsp.connType;
2,695,460✔
160
  pTscObj->passInfo.ver = connectRsp.passVer;
2,694,970✔
161
  pTscObj->authVer = connectRsp.authVer;
2,695,992✔
162
  pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
2,696,218✔
163

164
  if (taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL) {
2,696,251✔
165
    if (taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo,
1,193,357✔
166
                    POINTER_BYTES) != 0) {
167
      tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
×
168
    } else {
169
#ifdef USE_MONITOR
170
      MonitorSlowLogData data = {0};
1,193,357✔
171
      data.clusterId = pTscObj->pAppInfo->clusterId;
1,193,357✔
172
      data.type = SLOW_LOG_READ_BEGINNIG;
1,193,357✔
173
      (void)monitorPutData2MonitorQueue(data);  // ignore
1,193,357✔
174
      monitorClientSlowQueryInit(connectRsp.clusterId);
1,193,357✔
175
      monitorClientSQLReqInit(connectRsp.clusterId);
1,193,357✔
176
#endif
177
    }
178
  }
179

180
  (void)taosThreadMutexLock(&clientHbMgr.lock);
2,697,370✔
181
  SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
2,697,488✔
182
  if (pAppHbMgr) {
2,697,488✔
183
    if (hbRegisterConn(pAppHbMgr, pTscObj->id, pTscObj->user, pTscObj->tokenName, connectRsp.clusterId, connectRsp.connType) != 0) {
2,697,488✔
184
      tscError("QID:0x%" PRIx64 ", failed to register conn to hbMgr", pRequest->requestId);
×
185
    }
186
  } else {
187
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
188
    code = TSDB_CODE_TSC_DISCONNECTED;
×
189
    goto End;
×
190
  }
191
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
2,697,488✔
192

193
  tscDebug("QID:0x%" PRIx64 ", clusterId:0x%" PRIx64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
2,697,488✔
194
           pTscObj->pAppInfo->numOfConns);
195

196
End:
2,688,266✔
197
  if (code != 0) {
2,711,170✔
198
    setErrno(pRequest, code);
13,682✔
199
  }
200
  if (tsem_post(&pRequest->body.rspSem) != 0) {
2,711,170✔
201
    tscError("failed to post semaphore");
×
202
  }
203

204
  if (pRequest) {
2,711,170✔
205
    (void)releaseRequest(pRequest->self);
2,711,170✔
206
  }
207

208
EXIT:
2,642,047✔
209
  taosMemoryFree(param);
2,711,170✔
210
  taosMemoryFree(pMsg->pEpSet);
2,711,170✔
211
  taosMemoryFree(pMsg->pData);
2,711,170✔
212
  return code;
2,711,170✔
213
}
214

215
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
17,219,137✔
216
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
17,219,137✔
217
  if (pMsgSendInfo == NULL) return pMsgSendInfo;
17,219,606✔
218
  pMsgSendInfo->requestObjRefId = pRequest->self;
17,219,606✔
219
  pMsgSendInfo->requestId = pRequest->requestId;
17,219,606✔
220
  pMsgSendInfo->param = pRequest;
17,219,864✔
221
  pMsgSendInfo->msgType = pRequest->type;
17,219,606✔
222
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
17,219,606✔
223

224
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
17,219,606✔
225
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
17,219,606✔
226
  return pMsgSendInfo;
17,219,520✔
227
}
228

229
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,366,987✔
230
  // todo rsp with the vnode id list
231
  SRequestObj* pRequest = param;
1,366,987✔
232
  taosMemoryFree(pMsg->pData);
1,366,987✔
233
  taosMemoryFree(pMsg->pEpSet);
1,366,987✔
234
  if (code != TSDB_CODE_SUCCESS) {
1,366,987✔
235
    setErrno(pRequest, code);
43,803✔
236
  } else {
237
    struct SCatalog* pCatalog = NULL;
1,323,184✔
238
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,323,184✔
239
    if (TSDB_CODE_SUCCESS == code) {
1,323,184✔
240
      STscObj* pTscObj = pRequest->pTscObj;
1,323,184✔
241

242
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,323,184✔
243
                               .requestId = pRequest->requestId,
1,323,184✔
244
                               .requestObjRefId = pRequest->self,
1,323,184✔
245
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,323,184✔
246
      char             dbFName[TSDB_DB_FNAME_LEN];
1,302,307✔
247
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,323,184✔
248
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,323,184✔
249
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
250
      }
251
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,323,184✔
252
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,323,184✔
253
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
254
      }
255
    }
256
  }
257

258
  if (pRequest->body.queryFp) {
1,366,987✔
259
    doRequestCallback(pRequest, code);
1,366,987✔
260
  } else {
261
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
262
      tscError("failed to post semaphore");
×
263
    }
264
  }
265
  return code;
1,366,987✔
266
}
267

268
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,518,458✔
269
  SRequestObj* pRequest = param;
2,518,458✔
270
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
2,518,458✔
271
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
272
    SUseDbRsp usedbRsp = {0};
1,938✔
273
    if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
1,938✔
274
      tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
1,938✔
275
    }
276
    struct SCatalog* pCatalog = NULL;
1,938✔
277

278
    if (usedbRsp.vgVersion >= 0) {  // cached in local
1,938✔
279
      int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
1,938✔
280
      int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
1,938✔
281
      if (code1 != TSDB_CODE_SUCCESS) {
1,938✔
282
        tscWarn("QID:0x%" PRIx64 ", catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
×
283
                tstrerror(code1));
284
      } else {
285
        if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0) {
1,938✔
286
          tscError("QID:0x%" PRIx64 ", catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
×
287
                   usedbRsp.uid);
288
        }
289
      }
290
    }
291
    tFreeSUsedbRsp(&usedbRsp);
1,938✔
292
  }
293

294
  if (code != TSDB_CODE_SUCCESS) {
2,518,458✔
295
    taosMemoryFree(pMsg->pData);
4,349✔
296
    taosMemoryFree(pMsg->pEpSet);
4,349✔
297
    setErrno(pRequest, code);
4,349✔
298

299
    if (pRequest->body.queryFp != NULL) {
4,349✔
300
      doRequestCallback(pRequest, pRequest->code);
4,349✔
301

302
    } else {
303
      if (tsem_post(&pRequest->body.rspSem) != 0) {
×
304
        tscError("failed to post semaphore");
×
305
      }
306
    }
307

308
    return code;
4,349✔
309
  }
310

311
  SUseDbRsp usedbRsp = {0};
2,514,109✔
312
  if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
2,514,109✔
313
    tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
×
314
  }
315

316
  if (strlen(usedbRsp.db) == 0) {
2,514,076✔
317
    taosMemoryFree(pMsg->pData);
×
318
    taosMemoryFree(pMsg->pEpSet);
×
319

320
    if (usedbRsp.errCode != 0) {
×
321
      return usedbRsp.errCode;
×
322
    } else {
323
      return TSDB_CODE_APP_ERROR;
×
324
    }
325
  }
326

327
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
2,514,076✔
328
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
4,968,426✔
329
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
2,454,350✔
330
    if (pInfo == NULL) {
2,454,350✔
331
      continue;
×
332
    }
333
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
2,454,350✔
334
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
5,111,926✔
335
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
2,657,576✔
336
    }
337
  }
338

339
  SName name = {0};
2,514,076✔
340
  if (tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
2,514,076✔
341
    tscError("QID:0x%" PRIx64 ", failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
×
342
  }
343

344
  SUseDbOutput output = {0};
2,514,318✔
345
  code = queryBuildUseDbOutput(&output, &usedbRsp);
2,514,109✔
346
  if (code != 0) {
2,514,109✔
347
    terrno = code;
×
348
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
×
349

350
    tscError("QID:0x%" PRIx64 ", failed to build use db output since %s", pRequest->requestId, terrstr());
×
351
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
2,514,109✔
352
    struct SCatalog* pCatalog = NULL;
1,113,895✔
353

354
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,113,895✔
355
    if (code1 != TSDB_CODE_SUCCESS) {
1,113,895✔
356
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
×
357
              tstrerror(code1));
358
    } else {
359
      if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0) {
1,113,895✔
360
        tscError("QID:0x%" PRIx64 ", failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
×
361
                 output.dbId);
362
      }
363
      output.dbVgroup = NULL;
1,113,895✔
364
    }
365
  }
366

367
  taosMemoryFreeClear(output.dbVgroup);
2,514,109✔
368
  tFreeSUsedbRsp(&usedbRsp);
2,514,109✔
369

370
  char db[TSDB_DB_NAME_LEN] = {0};
2,514,318✔
371
  if (tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
2,514,318✔
372
    tscError("QID:0x%" PRIx64 ", failed to get db name since %s", pRequest->requestId, tstrerror(code));
×
373
  }
374

375
  setConnectionDB(pRequest->pTscObj, db);
2,514,109✔
376

377
  taosMemoryFree(pMsg->pData);
2,514,287✔
378
  taosMemoryFree(pMsg->pEpSet);
2,514,109✔
379

380
  if (pRequest->body.queryFp != NULL) {
2,514,076✔
381
    doRequestCallback(pRequest, pRequest->code);
2,513,818✔
382
  } else {
383
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
384
      tscError("failed to post semaphore");
×
385
    }
386
  }
387
  return 0;
2,514,078✔
388
}
389

390
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,938,737✔
391
  if (pMsg == NULL) {
1,938,737✔
392
    tscError("processCreateSTableRsp: invalid input param, pMsg is NULL");
×
393
    return TSDB_CODE_TSC_INVALID_INPUT;
×
394
  }
395
  if (param == NULL) {
1,938,737✔
396
    taosMemoryFree(pMsg->pEpSet);
×
397
    taosMemoryFree(pMsg->pData);
×
398
    tscError("processCreateSTableRsp: invalid input param, param is NULL");
×
399
    return TSDB_CODE_TSC_INVALID_INPUT;
×
400
  }
401

402
  SRequestObj* pRequest = param;
1,938,737✔
403

404
  if (code != TSDB_CODE_SUCCESS) {
1,938,737✔
405
    setErrno(pRequest, code);
11,297✔
406
  } else {
407
    SMCreateStbRsp createRsp = {0};
1,927,440✔
408
    SDecoder       coder = {0};
1,927,440✔
409
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
1,927,440✔
410
    if (pMsg->len > 0) {
1,927,440✔
411
      code = tDecodeSMCreateStbRsp(&coder, &createRsp);  // pMsg->len == 0
1,918,043✔
412
      if (code != TSDB_CODE_SUCCESS) {
1,918,043✔
413
        setErrno(pRequest, code);
×
414
      }
415
    }
416
    tDecoderClear(&coder);
1,927,440✔
417

418
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
1,927,440✔
419
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
1,927,440✔
420
  }
421

422
  taosMemoryFree(pMsg->pEpSet);
1,938,737✔
423
  taosMemoryFree(pMsg->pData);
1,938,737✔
424

425
  if (pRequest->body.queryFp != NULL) {
1,938,737✔
426
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,587,493✔
427

428
    if (code == TSDB_CODE_SUCCESS) {
1,587,493✔
429
      SCatalog* pCatalog = NULL;
1,582,009✔
430
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,582,009✔
431
      if (pRes->res != NULL) {
1,582,009✔
432
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
1,575,142✔
433
      }
434

435
      if (ret != TSDB_CODE_SUCCESS) {
1,582,009✔
436
        code = ret;
×
437
      }
438
    }
439

440
    doRequestCallback(pRequest, code);
1,587,493✔
441
  } else {
442
    if (tsem_post(&pRequest->body.rspSem) != 0) {
351,244✔
443
      tscError("failed to post semaphore");
×
444
    }
445
  }
446
  return code;
1,938,737✔
447
}
448

449
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,224,270✔
450
  SRequestObj* pRequest = param;
1,224,270✔
451
  if (code != TSDB_CODE_SUCCESS) {
1,224,270✔
452
    setErrno(pRequest, code);
11,482✔
453
  } else {
454
    SDropDbRsp dropdbRsp = {0};
1,212,788✔
455
    if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0) {
1,212,788✔
456
      tscError("QID:0x%" PRIx64 ", deserialize SDropDbRsp failed", pRequest->requestId);
×
457
    }
458
    struct SCatalog* pCatalog = NULL;
1,212,788✔
459
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,212,788✔
460
    if (TSDB_CODE_SUCCESS == code) {
1,212,788✔
461
      if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0) {
1,212,788✔
462
        tscError("QID:0x%" PRIx64 ", failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
×
463
      }
464
      STscObj* pTscObj = pRequest->pTscObj;
1,212,788✔
465

466
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,212,788✔
467
                               .requestId = pRequest->requestId,
1,212,788✔
468
                               .requestObjRefId = pRequest->self,
1,212,788✔
469
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,212,788✔
470
      char             dbFName[TSDB_DB_FNAME_LEN] = {0};
1,212,788✔
471
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,212,788✔
472
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
1,212,788✔
473
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
474
      }
475
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,212,788✔
476
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,212,788✔
477
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
478
      }
479
    }
480
  }
481

482
  taosMemoryFree(pMsg->pData);
1,224,270✔
483
  taosMemoryFree(pMsg->pEpSet);
1,224,270✔
484

485
  if (pRequest->body.queryFp != NULL) {
1,224,270✔
486
    doRequestCallback(pRequest, code);
1,224,270✔
487
  } else {
488
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
489
      tscError("failed to post semaphore");
×
490
    }
491
  }
492
  return code;
1,224,270✔
493
}
494

495
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
6,170,442✔
496
  SRequestObj* pRequest = param;
6,170,442✔
497
  if (code != TSDB_CODE_SUCCESS) {
6,170,442✔
498
    setErrno(pRequest, code);
1,052,867✔
499
  } else {
500
    SMAlterStbRsp alterRsp = {0};
5,117,575✔
501
    SDecoder      coder = {0};
5,117,575✔
502
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
5,117,575✔
503
    if (pMsg->len > 0) {
5,117,575✔
504
      code = tDecodeSMAlterStbRsp(&coder, &alterRsp);  // pMsg->len == 0
5,094,465✔
505
      if (code != TSDB_CODE_SUCCESS) {
5,094,465✔
506
        setErrno(pRequest, code);
×
507
      }
508
    }
509
    tDecoderClear(&coder);
5,117,575✔
510

511
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
5,117,575✔
512
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
5,117,575✔
513
  }
514

515
  taosMemoryFree(pMsg->pData);
6,170,442✔
516
  taosMemoryFree(pMsg->pEpSet);
6,170,442✔
517

518
  if (pRequest->body.queryFp != NULL) {
6,170,442✔
519
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
6,170,442✔
520

521
    if (code == TSDB_CODE_SUCCESS) {
6,170,442✔
522
      SCatalog* pCatalog = NULL;
5,117,575✔
523
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
5,117,575✔
524
      if (pRes->res != NULL) {
5,117,575✔
525
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
5,094,465✔
526
      }
527

528
      if (ret != TSDB_CODE_SUCCESS) {
5,117,575✔
529
        code = ret;
×
530
      }
531
    }
532

533
    doRequestCallback(pRequest, code);
6,170,442✔
534
  } else {
535
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
536
      tscError("failed to post semaphore");
×
537
    }
538
  }
539
  return code;
6,170,442✔
540
}
541

542
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
43,193✔
543
  int32_t      code = 0;
43,193✔
544
  int32_t      line = 0;
43,193✔
545
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
43,193✔
546
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
43,193✔
547
  pBlock->info.hasVarCol = true;
43,193✔
548

549
  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
43,193✔
550
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
43,193✔
551
  SColumnInfoData infoData = {0};
43,193✔
552
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
43,193✔
553
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
43,193✔
554
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
86,386✔
555

556
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
43,193✔
557
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
43,193✔
558
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
86,386✔
559

560
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
43,193✔
561
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
43,193✔
562
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
86,386✔
563

564
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
43,193✔
565
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD4_LEN;
43,193✔
566
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
86,386✔
567

568
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
43,193✔
569
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD5_LEN;
43,193✔
570
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
86,386✔
571

572
  int32_t numOfCfg = taosArrayGetSize(pVars);
43,193✔
573
  code = blockDataEnsureCapacity(pBlock, numOfCfg);
43,193✔
574
  TSDB_CHECK_CODE(code, line, END);
43,193✔
575

576
  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
4,639,697✔
577
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
4,596,504✔
578
    TSDB_CHECK_NULL(pInfo, code, line, END, terrno);
4,596,504✔
579

580
    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
4,596,504✔
581
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
4,596,504✔
582
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,596,504✔
583
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,596,504✔
584
    code = colDataSetVal(pColInfo, i, name, false);
4,596,504✔
585
    TSDB_CHECK_CODE(code, line, END);
4,596,504✔
586

587
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
4,596,504✔
588
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
4,596,504✔
589
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,596,504✔
590
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,596,504✔
591
    code = colDataSetVal(pColInfo, i, value, false);
4,596,504✔
592
    TSDB_CHECK_CODE(code, line, END);
4,596,504✔
593

594
    char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
4,596,504✔
595
    STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
4,596,504✔
596
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,596,504✔
597
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,596,504✔
598
    code = colDataSetVal(pColInfo, i, scope, false);
4,596,504✔
599
    TSDB_CHECK_CODE(code, line, END);
4,596,504✔
600

601
    char category[TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE] = {0};
4,596,504✔
602
    STR_WITH_MAXSIZE_TO_VARSTR(category, pInfo->category, TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE);
4,596,504✔
603
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,596,504✔
604
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,596,504✔
605
    code = colDataSetVal(pColInfo, i, category, false);
4,596,504✔
606
    TSDB_CHECK_CODE(code, line, END);
4,596,504✔
607

608
    char info[TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE] = {0};
4,596,504✔
609
    STR_WITH_MAXSIZE_TO_VARSTR(info, pInfo->info, TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE);
4,596,504✔
610
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,596,504✔
611
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,596,504✔
612
    code = colDataSetVal(pColInfo, i, info, false);
4,596,504✔
613
    TSDB_CHECK_CODE(code, line, END);
4,596,504✔
614
  }
615

616
  pBlock->info.rows = numOfCfg;
43,193✔
617

618
  *block = pBlock;
43,193✔
619
  return code;
43,193✔
620

621
END:
×
622
  taosArrayDestroy(pBlock->pDataBlock);
×
623
  taosMemoryFree(pBlock);
×
624
  return code;
×
625
}
626

627
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
43,193✔
628
  SSDataBlock* pBlock = NULL;
43,193✔
629
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
43,193✔
630
  if (code) {
43,193✔
631
    return code;
×
632
  }
633

634
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
43,193✔
635
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
43,193✔
636
  *pRsp = taosMemoryCalloc(1, rspSize);
43,193✔
637
  if (NULL == *pRsp) {
43,193✔
638
    code = terrno;
×
639
    goto _exit;
×
640
  }
641

642
  (*pRsp)->useconds = 0;
43,193✔
643
  (*pRsp)->completed = 1;
43,193✔
644
  (*pRsp)->precision = 0;
43,193✔
645
  (*pRsp)->compressed = 0;
43,193✔
646

647
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
43,193✔
648
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
43,193✔
649

650
  int32_t len = 0;
43,193✔
651
  if ((*pRsp)->numOfRows > 0) {
43,193✔
652
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
42,421✔
653
    if (len < 0) {
42,421✔
654
      uError("buildShowVariablesRsp error, len:%d", len);
×
655
      code = terrno;
×
656
      goto _exit;
×
657
    }
658
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
42,421✔
659

660
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
42,421✔
661
    (*pRsp)->payloadLen = htonl(payloadLen);
42,421✔
662
    (*pRsp)->compLen = htonl(payloadLen);
42,421✔
663

664
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
42,421✔
665
      uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
666
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
667
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
668
      goto _exit;
×
669
    }
670
  }
671

672
  blockDataDestroy(pBlock);
43,193✔
673
  pBlock = NULL;
43,193✔
674

675
  return TSDB_CODE_SUCCESS;
43,193✔
676
_exit:
×
677
  if (*pRsp) {
×
678
    taosMemoryFree(*pRsp);
×
679
    *pRsp = NULL;
×
680
  }
681
  if (pBlock) {
×
682
    blockDataDestroy(pBlock);
×
683
    pBlock = NULL;
×
684
  }
685
  return code;
×
686
}
687

688
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
43,193✔
689
  SRequestObj* pRequest = param;
43,193✔
690
  if (code != TSDB_CODE_SUCCESS) {
43,193✔
691
    setErrno(pRequest, code);
×
692
  } else {
693
    SShowVariablesRsp  rsp = {0};
43,193✔
694
    SRetrieveTableRsp* pRes = NULL;
43,193✔
695
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
43,193✔
696
    if (TSDB_CODE_SUCCESS == code) {
43,193✔
697
      code = buildShowVariablesRsp(rsp.variables, &pRes);
43,193✔
698
    }
699
    if (TSDB_CODE_SUCCESS == code) {
43,193✔
700
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
43,193✔
701
    }
702

703
    if (code != 0) {
43,193✔
704
      pRequest->body.resInfo.pRspMsg = NULL;
×
705
      taosMemoryFree(pRes);
×
706
    }
707
    tFreeSShowVariablesRsp(&rsp);
43,193✔
708
  }
709

710
  taosMemoryFree(pMsg->pData);
43,193✔
711
  taosMemoryFree(pMsg->pEpSet);
43,193✔
712

713
  if (pRequest->body.queryFp != NULL) {
43,193✔
714
    doRequestCallback(pRequest, code);
43,193✔
715
  } else {
716
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
717
      tscError("failed to post semaphore");
×
718
    }
719
  }
720
  return code;
43,193✔
721
}
722

723
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
51,138✔
724
  int32_t      code = 0;
51,138✔
725
  int32_t      line = 0;
51,138✔
726
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
51,138✔
727
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
51,138✔
728
  pBlock->info.hasVarCol = true;
51,138✔
729

730
  pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
51,138✔
731
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
51,138✔
732
  SColumnInfoData infoData = {0};
51,138✔
733
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
51,138✔
734
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
51,138✔
735
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
102,276✔
736

737
  infoData.info.type = TSDB_DATA_TYPE_INT;
51,138✔
738
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
51,138✔
739
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
102,276✔
740

741
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
51,138✔
742
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
51,138✔
743
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
102,276✔
744

745
  code = blockDataEnsureCapacity(pBlock, 1);
51,138✔
746
  TSDB_CHECK_CODE(code, line, END);
51,138✔
747

748
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
51,138✔
749
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
51,138✔
750
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
51,138✔
751
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
51,138✔
752
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
51,138✔
753
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
51,138✔
754

755
  char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
51,138✔
756
  char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
51,138✔
757
  if (pRsp->bAccepted) {
51,138✔
758
    STR_TO_VARSTR(result, "accepted");
51,138✔
759
    code = colDataSetVal(pResultCol, 0, result, false);
51,138✔
760
    TSDB_CHECK_CODE(code, line, END);
51,138✔
761
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
51,138✔
762
    TSDB_CHECK_CODE(code, line, END);
51,138✔
763
    STR_TO_VARSTR(reason, "success");
51,138✔
764
    code = colDataSetVal(pReasonCol, 0, reason, false);
51,138✔
765
    TSDB_CHECK_CODE(code, line, END);
51,138✔
766
  } else {
767
    STR_TO_VARSTR(result, "rejected");
×
768
    code = colDataSetVal(pResultCol, 0, result, false);
×
769
    TSDB_CHECK_CODE(code, line, END);
×
770
    colDataSetNULL(pIdCol, 0);
771
    STR_TO_VARSTR(reason, "compaction is ongoing");
×
772
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
773
    TSDB_CHECK_CODE(code, line, END);
×
774
  }
775
  pBlock->info.rows = 1;
51,138✔
776

777
  *block = pBlock;
51,138✔
778

779
  return TSDB_CODE_SUCCESS;
51,138✔
780
END:
×
781
  taosMemoryFree(pBlock);
×
782
  taosArrayDestroy(pBlock->pDataBlock);
×
783
  return code;
×
784
}
785

786
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
51,138✔
787
  SSDataBlock* pBlock = NULL;
51,138✔
788
  int32_t      code = buildCompactDbBlock(pCompactDb, &pBlock);
51,138✔
789
  if (code) {
51,138✔
790
    return code;
×
791
  }
792

793
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
51,138✔
794
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
51,138✔
795
  *pRsp = taosMemoryCalloc(1, rspSize);
51,138✔
796
  if (NULL == *pRsp) {
51,138✔
797
    code = terrno;
×
798
    goto _exit;
×
799
  }
800

801
  (*pRsp)->useconds = 0;
51,138✔
802
  (*pRsp)->completed = 1;
51,138✔
803
  (*pRsp)->precision = 0;
51,138✔
804
  (*pRsp)->compressed = 0;
51,138✔
805
  (*pRsp)->compLen = 0;
51,138✔
806
  (*pRsp)->payloadLen = 0;
51,138✔
807
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
51,138✔
808
  (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
51,138✔
809

810
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
51,138✔
811
  if (len < 0) {
51,138✔
812
    uError("buildRetriveTableRspForCompactDb error, len:%d", len);
×
813
    code = terrno;
×
814
    goto _exit;
×
815
  }
816
  blockDataDestroy(pBlock);
51,138✔
817

818
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
51,138✔
819

820
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
51,138✔
821
  (*pRsp)->payloadLen = htonl(payloadLen);
51,138✔
822
  (*pRsp)->compLen = htonl(payloadLen);
51,138✔
823

824
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
51,138✔
825
    uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
826
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
827
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
828
    goto _exit;
×
829
  }
830

831
  return TSDB_CODE_SUCCESS;
51,138✔
832
_exit:
×
833
  if (*pRsp) {
×
834
    taosMemoryFree(*pRsp);
×
835
    *pRsp = NULL;
×
836
  }
837
  if (pBlock) {
×
838
    blockDataDestroy(pBlock);
×
839
    pBlock = NULL;
×
840
  }
841
  return code;
×
842
}
843

844
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
38,741✔
845
  SRequestObj* pRequest = param;
38,741✔
846
  if (code != TSDB_CODE_SUCCESS) {
38,741✔
847
    setErrno(pRequest, code);
4,168✔
848
  } else {
849
    SCompactDbRsp      rsp = {0};
34,573✔
850
    SRetrieveTableRsp* pRes = NULL;
34,573✔
851
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
34,573✔
852
    if (TSDB_CODE_SUCCESS == code) {
34,573✔
853
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
34,573✔
854
    }
855
    if (TSDB_CODE_SUCCESS == code) {
34,573✔
856
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
34,573✔
857
    }
858

859
    if (code != 0) {
34,573✔
860
      pRequest->body.resInfo.pRspMsg = NULL;
×
861
      taosMemoryFree(pRes);
×
862
    }
863
  }
864

865
  taosMemoryFree(pMsg->pData);
38,741✔
866
  taosMemoryFree(pMsg->pEpSet);
38,741✔
867

868
  if (pRequest->body.queryFp != NULL) {
38,741✔
869
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
38,741✔
870
  } else {
871
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
872
      tscError("failed to post semaphore");
×
873
    }
874
  }
875
  return code;
38,741✔
876
}
877

878
static int32_t buildScanDbBlock(SScanDbRsp* pRsp, SSDataBlock** block) {
454✔
879
  int32_t      code = 0;
454✔
880
  int32_t      line = 0;
454✔
881
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
454✔
882
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
454✔
883
  pBlock->info.hasVarCol = true;
454✔
884

885
  pBlock->pDataBlock = taosArrayInit(SCAN_DB_RESULT_COLS, sizeof(SColumnInfoData));
454✔
886
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
454✔
887
  SColumnInfoData infoData = {0};
454✔
888
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
454✔
889
  infoData.info.bytes = SCAN_DB_RESULT_FIELD1_LEN;
454✔
890
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
908✔
891

892
  infoData.info.type = TSDB_DATA_TYPE_INT;
454✔
893
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
454✔
894
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
908✔
895

896
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
454✔
897
  infoData.info.bytes = SCAN_DB_RESULT_FIELD3_LEN;
454✔
898
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
908✔
899

900
  code = blockDataEnsureCapacity(pBlock, 1);
454✔
901
  TSDB_CHECK_CODE(code, line, END);
454✔
902

903
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
454✔
904
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
454✔
905
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
454✔
906
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
454✔
907
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
454✔
908
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
454✔
909

910
  char result[SCAN_DB_RESULT_FIELD1_LEN] = {0};
454✔
911
  char reason[SCAN_DB_RESULT_FIELD3_LEN] = {0};
454✔
912
  if (pRsp->bAccepted) {
454✔
913
    STR_TO_VARSTR(result, "accepted");
454✔
914
    code = colDataSetVal(pResultCol, 0, result, false);
454✔
915
    TSDB_CHECK_CODE(code, line, END);
454✔
916
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->scanId, false);
454✔
917
    TSDB_CHECK_CODE(code, line, END);
454✔
918
    STR_TO_VARSTR(reason, "success");
454✔
919
    code = colDataSetVal(pReasonCol, 0, reason, false);
454✔
920
    TSDB_CHECK_CODE(code, line, END);
454✔
921
  } else {
922
    STR_TO_VARSTR(result, "rejected");
×
923
    code = colDataSetVal(pResultCol, 0, result, false);
×
924
    TSDB_CHECK_CODE(code, line, END);
×
925
    colDataSetNULL(pIdCol, 0);
926
    STR_TO_VARSTR(reason, "scan is ongoing");
×
927
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
928
    TSDB_CHECK_CODE(code, line, END);
×
929
  }
930
  pBlock->info.rows = 1;
454✔
931

932
  *block = pBlock;
454✔
933

934
  return TSDB_CODE_SUCCESS;
454✔
935
END:
×
936
  taosMemoryFree(pBlock);
×
937
  taosArrayDestroy(pBlock->pDataBlock);
×
938
  return code;
×
939
}
940

941
static int32_t buildRetriveTableRspForScanDb(SScanDbRsp* pScanDb, SRetrieveTableRsp** pRsp) {
454✔
942
  SSDataBlock* pBlock = NULL;
454✔
943
  int32_t      code = buildScanDbBlock(pScanDb, &pBlock);
454✔
944
  if (code) {
454✔
945
    return code;
×
946
  }
947

948
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
454✔
949
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
454✔
950
  *pRsp = taosMemoryCalloc(1, rspSize);
454✔
951
  if (NULL == *pRsp) {
454✔
952
    code = terrno;
×
953
    goto _exit;
×
954
  }
955

956
  (*pRsp)->useconds = 0;
454✔
957
  (*pRsp)->completed = 1;
454✔
958
  (*pRsp)->precision = 0;
454✔
959
  (*pRsp)->compressed = 0;
454✔
960
  (*pRsp)->compLen = 0;
454✔
961
  (*pRsp)->payloadLen = 0;
454✔
962
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
454✔
963
  (*pRsp)->numOfCols = htonl(SCAN_DB_RESULT_COLS);
454✔
964

965
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SCAN_DB_RESULT_COLS);
454✔
966
  if (len < 0) {
454✔
967
    uError("%s error, len:%d", __func__, len);
×
968
    code = terrno;
×
969
    goto _exit;
×
970
  }
971
  blockDataDestroy(pBlock);
454✔
972

973
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
454✔
974

975
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
454✔
976
  (*pRsp)->payloadLen = htonl(payloadLen);
454✔
977
  (*pRsp)->compLen = htonl(payloadLen);
454✔
978

979
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
454✔
980
    uError("%s error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, __func__, len,
×
981
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
982
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
983
    goto _exit;
×
984
  }
985

986
  return TSDB_CODE_SUCCESS;
454✔
987
_exit:
×
988
  if (*pRsp) {
×
989
    taosMemoryFree(*pRsp);
×
990
    *pRsp = NULL;
×
991
  }
992
  if (pBlock) {
×
993
    blockDataDestroy(pBlock);
×
994
    pBlock = NULL;
×
995
  }
996
  return code;
×
997
}
998

999
static int32_t processScanDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
680✔
1000
  SRequestObj* pRequest = param;
680✔
1001
  if (code != TSDB_CODE_SUCCESS) {
680✔
1002
    setErrno(pRequest, code);
226✔
1003
  } else {
1004
    SScanDbRsp         rsp = {0};
454✔
1005
    SRetrieveTableRsp* pRes = NULL;
454✔
1006
    code = tDeserializeSScanDbRsp(pMsg->pData, pMsg->len, &rsp);
454✔
1007
    if (TSDB_CODE_SUCCESS == code) {
454✔
1008
      code = buildRetriveTableRspForScanDb(&rsp, &pRes);
454✔
1009
    }
1010
    if (TSDB_CODE_SUCCESS == code) {
454✔
1011
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
454✔
1012
    }
1013

1014
    if (code != 0) {
454✔
1015
      pRequest->body.resInfo.pRspMsg = NULL;
×
1016
      taosMemoryFree(pRes);
×
1017
    }
1018
  }
1019

1020
  taosMemoryFree(pMsg->pData);
680✔
1021
  taosMemoryFree(pMsg->pEpSet);
680✔
1022

1023
  if (pRequest->body.queryFp != NULL) {
680✔
1024
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
680✔
1025
  } else {
1026
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1027
      tscError("failed to post semaphore");
×
1028
    }
1029
  }
1030
  return code;
680✔
1031
}
1032

1033
int32_t processTrimDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
23,666✔
1034
  SRequestObj* pRequest = param;
23,666✔
1035
  if (code != TSDB_CODE_SUCCESS) {
23,666✔
1036
    setErrno(pRequest, code);
7,101✔
1037
  } else {
1038
    STrimDbRsp         rsp = {0};
16,565✔
1039
    SRetrieveTableRsp* pRes = NULL;
16,565✔
1040
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, (SCompactDbRsp*)&rsp);
16,565✔
1041
    if (TSDB_CODE_SUCCESS == code) {
16,565✔
1042
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
16,565✔
1043
    }
1044
    if (TSDB_CODE_SUCCESS == code) {
16,565✔
1045
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
16,565✔
1046
    }
1047

1048
    if (code != 0) {
16,565✔
1049
      pRequest->body.resInfo.pRspMsg = NULL;
×
1050
      taosMemoryFree(pRes);
×
1051
    }
1052
  }
1053

1054
  taosMemoryFree(pMsg->pData);
23,666✔
1055
  taosMemoryFree(pMsg->pEpSet);
23,666✔
1056

1057
  if (pRequest->body.queryFp != NULL) {
23,666✔
1058
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
23,666✔
1059
  } else {
1060
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1061
      tscError("failed to post semaphore");
×
1062
    }
1063
  }
1064
  return code;
23,666✔
1065
}
1066

1067
static int32_t buildCreateTokenBlock(SCreateTokenRsp* pRsp, SSDataBlock** block) {
129✔
1068
  int32_t      code = 0;
129✔
1069
  int32_t      line = 0;
129✔
1070
  SSDataBlock* pBlock = taosMemoryCalloc(CREATE_USER_TOKEN_RESULT_COLS, sizeof(SSDataBlock));
129✔
1071
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
129✔
1072
  pBlock->info.hasVarCol = true;
129✔
1073

1074
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
129✔
1075
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
129✔
1076
  SColumnInfoData infoData = {0};
129✔
1077
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
129✔
1078
  infoData.info.bytes = CREATE_USER_TOKEN_RESULT_FIELD1_LEN;
129✔
1079
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
258✔
1080

1081
  code = blockDataEnsureCapacity(pBlock, 1);
129✔
1082
  TSDB_CHECK_CODE(code, line, END);
129✔
1083

1084
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
129✔
1085
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
129✔
1086

1087
  char result[128 + 64] = {0};
129✔
1088
  STR_TO_VARSTR(result, pRsp->token);
129✔
1089
  code = colDataSetVal(pResultCol, 0, result, false);
129✔
1090
  TSDB_CHECK_CODE(code, line, END);
129✔
1091

1092
  pBlock->info.rows = 1;
129✔
1093

1094
  *block = pBlock;
129✔
1095
  return TSDB_CODE_SUCCESS;
129✔
1096

1097
END:
×
1098
  taosMemoryFree(pBlock);
×
1099
  taosArrayDestroy(pBlock->pDataBlock);
×
1100
  return code;
×
1101
}
1102

1103
static int32_t buildTableRspForCreateToken(SCreateTokenRsp* pResp, SRetrieveTableRsp** pRsp) {
129✔
1104
  SSDataBlock* pBlock = NULL;
129✔
1105
  int32_t      code = buildCreateTokenBlock(pResp, &pBlock);
129✔
1106
  if (code) {
129✔
1107
    return code;
×
1108
  }
1109

1110
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
129✔
1111
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
129✔
1112
  *pRsp = taosMemoryCalloc(1, rspSize);
129✔
1113
  if (NULL == *pRsp) {
129✔
1114
    code = terrno;
×
1115
    goto _exit;
×
1116
  }
1117

1118
  (*pRsp)->useconds = 0;
129✔
1119
  (*pRsp)->completed = 1;
129✔
1120
  (*pRsp)->precision = 0;
129✔
1121
  (*pRsp)->compressed = 0;
129✔
1122

1123
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
129✔
1124
  (*pRsp)->numOfCols = htonl(CREATE_USER_TOKEN_RESULT_COLS);
129✔
1125

1126
  int32_t len =
1127
      blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, CREATE_USER_TOKEN_RESULT_COLS);
129✔
1128
  if (len < 0) {
129✔
1129
    uError("buildTableRspFroCreateToken error, len:%d", len);
×
1130
    code = terrno;
×
1131
    goto _exit;
×
1132
  }
1133

1134
  blockDataDestroy(pBlock);
129✔
1135
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
129✔
1136

1137
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
129✔
1138
  (*pRsp)->payloadLen = htonl(payloadLen);
129✔
1139
  (*pRsp)->compLen = htonl(payloadLen);
129✔
1140

1141
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
129✔
1142
    uError("buildTableRspFroCreateToken error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
1143
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
1144
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
1145
    goto _exit;
×
1146
  }
1147
  return TSDB_CODE_SUCCESS;
129✔
1148

1149
_exit:
×
1150
  if (*pRsp) {
×
1151
    taosMemoryFree(*pRsp);
×
1152
    *pRsp = NULL;
×
1153
  }
1154
  if (pBlock) {
×
1155
    blockDataDestroy(pBlock);
×
1156
    pBlock = NULL;
×
1157
  }
1158
  return code;
×
1159
}
1160

1161
int32_t processCreateTokenRsp(void* param, SDataBuf* pMsg, int32_t code) {
129✔
1162
  SRequestObj* pRequest = param;
129✔
1163
  if (code != TSDB_CODE_SUCCESS) {
129✔
1164
    setErrno(pRequest, code);
×
1165
  } else {
1166
    SCreateTokenRsp    rsp = {0};
129✔
1167
    SRetrieveTableRsp* pRes = NULL;
129✔
1168
    code = tDeserializeSCreateTokenResp(pMsg->pData, pMsg->len, &rsp);
129✔
1169
    if (TSDB_CODE_SUCCESS == code) {
129✔
1170
      code = buildTableRspForCreateToken(&rsp, &pRes);
129✔
1171
    }
1172
    if (TSDB_CODE_SUCCESS == code) {
129✔
1173
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
129✔
1174
    }
1175

1176
    if (code != 0) {
129✔
1177
      pRequest->body.resInfo.pRspMsg = NULL;
×
1178
      taosMemoryFree(pRes);
×
1179
    }
1180
  }
1181

1182
  taosMemoryFree(pMsg->pData);
129✔
1183
  taosMemoryFree(pMsg->pEpSet);
129✔
1184

1185
  if (pRequest->body.queryFp != NULL) {
129✔
1186
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
129✔
1187
  } else if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1188
    tscError("failed to post semaphore");
×
1189
  }
1190
  return code;
129✔
1191
}
1192

1193
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
19,930,699✔
1194
  switch (msgType) {
19,930,699✔
1195
    case TDMT_MND_CONNECT:
2,711,073✔
1196
      return processConnectRsp;
2,711,073✔
1197
    case TDMT_MND_CREATE_DB:
1,366,987✔
1198
      return processCreateDbRsp;
1,366,987✔
1199
    case TDMT_MND_USE_DB:
2,518,323✔
1200
      return processUseDbRsp;
2,518,323✔
1201
    case TDMT_MND_CREATE_STB:
1,938,737✔
1202
      return processCreateSTableRsp;
1,938,737✔
1203
    case TDMT_MND_DROP_DB:
1,224,270✔
1204
      return processDropDbRsp;
1,224,270✔
1205
    case TDMT_MND_ALTER_STB:
6,170,442✔
1206
      return processAlterStbRsp;
6,170,442✔
1207
    case TDMT_MND_SHOW_VARIABLES:
43,193✔
1208
      return processShowVariablesRsp;
43,193✔
1209
    case TDMT_MND_COMPACT_DB:
38,741✔
1210
      return processCompactDbRsp;
38,741✔
1211
    case TDMT_MND_TRIM_DB:
23,666✔
1212
      return processTrimDbRsp;
23,666✔
1213
    case TDMT_MND_SCAN_DB:
680✔
1214
      return processScanDbRsp;
680✔
1215
    case TDMT_MND_CREATE_TOKEN:
129✔
1216
      return processCreateTokenRsp;
129✔
1217

1218
    default:
3,894,458✔
1219
      return genericRspCallback;
3,894,458✔
1220
  }
1221
}
1222

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc