• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4878

11 Dec 2025 02:43AM UTC coverage: 64.569% (-0.02%) from 64.586%
#4878

push

travis-ci

guanshengliang
feat(TS-7270): internal dependence

307 of 617 new or added lines in 24 files covered. (49.76%)

3821 existing lines in 123 files now uncovered.

163630 of 253417 relevant lines covered (64.57%)

107598827.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.35
/source/client/src/clientMsgHandler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "cmdnodes.h"
21
#include "command.h"
22
#include "os.h"
23
#include "query.h"
24
#include "systable.h"
25
#include "tdatablock.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tname.h"
29
#include "tversion.h"
30

31
extern SClientHbMgr clientHbMgr;
32

33
static void setErrno(SRequestObj* pRequest, int32_t code) {
5,370,890✔
34
  pRequest->code = code;
5,370,890✔
35
  terrno = code;
5,370,890✔
36
}
5,370,890✔
37

38
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
4,185,977✔
39
  SRequestObj* pRequest = param;
4,185,977✔
40
  setErrno(pRequest, code);
4,185,977✔
41

42
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
4,185,977✔
43
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
1,235,043✔
44
      tscError("failed to remove meta data for table");
×
45
    }
46
  }
47

48
  taosMemoryFree(pMsg->pEpSet);
4,185,977✔
49
  taosMemoryFree(pMsg->pData);
4,185,977✔
50
  if (pRequest->body.queryFp != NULL) {
4,185,977✔
51
    doRequestCallback(pRequest, code);
4,185,785✔
52
  } else {
53
    if (tsem_post(&pRequest->body.rspSem) != 0) {
192✔
54
      tscError("failed to post semaphore");
×
55
    }
56
  }
57
  return code;
4,185,977✔
58
}
59

60
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
3,523,493✔
61
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
3,523,493✔
62
  if (NULL == pRequest) {
3,523,640✔
63
    goto EXIT;
×
64
  }
65

66
  if (code != TSDB_CODE_SUCCESS) {
3,523,640✔
67
    goto End;
15,429✔
68
  }
69

70
  STscObj* pTscObj = pRequest->pTscObj;
3,508,211✔
71

72
  if (NULL == pTscObj->pAppInfo) {
3,508,211✔
73
    code = TSDB_CODE_TSC_DISCONNECTED;
×
74
    goto End;
×
75
  }
76

77
  if (pTscObj->connType == CONN_TYPE__AUTH_TEST) {
3,508,211✔
78
    // auth test connection, no need to process connect rsp
NEW
79
    goto End;
×
80
  }
81

82
  SConnectRsp connectRsp = {0};
3,508,211✔
83
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
3,508,211✔
84
    code = TSDB_CODE_TSC_INVALID_VERSION;
×
UNCOV
85
    goto End;
×
86
  }
87

88
  if ((code = taosCheckVersionCompatibleFromStr(td_version, connectRsp.sVer, 3)) != 0) {
3,507,496✔
89
    tscError("version not compatible. client version:%s, server version:%s", td_version, connectRsp.sVer);
×
90
    goto End;
×
91
  }
92

93
  int32_t now = taosGetTimestampSec();
3,507,095✔
94
  int32_t delta = abs(now - connectRsp.svrTimestamp);
3,507,531✔
95
  if (delta > tsTimestampDeltaLimit) {
3,507,531✔
96
    code = TSDB_CODE_TIME_UNSYNCED;
429✔
97
    tscError("time diff:%ds is too big", delta);
429✔
UNCOV
98
    goto End;
×
99
  }
100

101
  if (connectRsp.epSet.numOfEps == 0) {
3,507,102✔
UNCOV
102
    code = TSDB_CODE_APP_ERROR;
×
103
    goto End;
×
104
  }
105

106
  int    updateEpSet = 1;
3,507,102✔
107
  SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
3,507,102✔
108
  if (connectRsp.dnodeNum == 1) {
3,506,872✔
109
    SEpSet dstEpSet = connectRsp.epSet;
3,374,157✔
110
    if (srcEpSet.numOfEps == 1) {
3,374,157✔
111
      if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
2,087,650✔
112
                            dstEpSet.eps[dstEpSet.inUse].fqdn) != 0) {
2,087,022✔
UNCOV
113
        tscError("failed to set default addr for rpc");
×
114
      }
115
      updateEpSet = 0;
2,086,889✔
116
    }
117
  }
118
  if (updateEpSet == 1 && !isEpsetEqual(&srcEpSet, &connectRsp.epSet)) {
3,506,889✔
119
    SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,311,189✔
120

121
    SEpSet* pOrig = &corEpSet;
1,311,683✔
122
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
1,311,683✔
123
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
1,312,283✔
124
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
1,311,683✔
125
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
126
             pNewEp->port);
127
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
1,311,683✔
128
  }
129

130
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
7,115,440✔
131
    tscDebug("QID:0x%" PRIx64 ", epSet.fqdn[%d]:%s port:%d, conn:0x%" PRIx64, pRequest->requestId, i,
3,609,245✔
132
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
133
  }
134

135
  pTscObj->sysInfo = connectRsp.sysInfo;
3,506,195✔
136
  pTscObj->connId = connectRsp.connId;
3,506,410✔
137
  pTscObj->acctId = connectRsp.acctId;
3,507,279✔
138
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
3,505,901✔
139
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
3,507,039✔
140

141
  // update the appInstInfo
142
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
3,506,794✔
143
  pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
3,506,627✔
144
  pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
3,505,590✔
145
  tscDebug("monitor paras from connect rsp, clusterId:0x%" PRIx64 ", threshold:%d scope:%d",
3,506,713✔
146
           connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
147
  lastClusterId = connectRsp.clusterId;
3,506,713✔
148

149
  pTscObj->connType = connectRsp.connType;
3,506,713✔
150
  pTscObj->passInfo.ver = connectRsp.passVer;
3,504,624✔
151
  pTscObj->authVer = connectRsp.authVer;
3,504,884✔
152
  pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
3,504,378✔
153

154
  if (taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL) {
3,505,818✔
155
    if (taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo,
1,668,972✔
156
                    POINTER_BYTES) != 0) {
UNCOV
157
      tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
×
158
    } else {
159
#ifdef USE_MONITOR
160
      MonitorSlowLogData data = {0};
1,669,572✔
161
      data.clusterId = pTscObj->pAppInfo->clusterId;
1,669,572✔
162
      data.type = SLOW_LOG_READ_BEGINNIG;
1,669,572✔
163
      (void)monitorPutData2MonitorQueue(data);  // ignore
1,669,572✔
164
      monitorClientSlowQueryInit(connectRsp.clusterId);
1,669,572✔
165
      monitorClientSQLReqInit(connectRsp.clusterId);
1,669,572✔
166
#endif
167
    }
168
  }
169

170
  (void)taosThreadMutexLock(&clientHbMgr.lock);
3,506,548✔
171
  SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
3,508,211✔
172
  if (pAppHbMgr) {
3,508,211✔
173
    if (hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType) != 0) {
3,508,211✔
UNCOV
174
      tscError("QID:0x%" PRIx64 ", failed to register conn to hbMgr", pRequest->requestId);
×
175
    }
176
  } else {
UNCOV
177
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
UNCOV
178
    code = TSDB_CODE_TSC_DISCONNECTED;
×
179
    goto End;
×
180
  }
181
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
3,508,211✔
182

183
  tscDebug("QID:0x%" PRIx64 ", clusterId:0x%" PRIx64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
3,508,211✔
184
           pTscObj->pAppInfo->numOfConns);
185

186
End:
3,394,162✔
187
  if (code != 0) {
3,523,640✔
188
    setErrno(pRequest, code);
15,429✔
189
  }
190
  if (tsem_post(&pRequest->body.rspSem) != 0) {
3,523,640✔
UNCOV
191
    tscError("failed to post semaphore");
×
192
  }
193

194
  if (pRequest) {
3,523,640✔
195
    (void)releaseRequest(pRequest->self);
3,523,640✔
196
  }
197

198
EXIT:
3,001,035✔
199
  taosMemoryFree(param);
3,523,640✔
200
  taosMemoryFree(pMsg->pEpSet);
3,523,640✔
201
  taosMemoryFree(pMsg->pData);
3,523,640✔
202
  return code;
3,523,640✔
203
}
204

205
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
17,904,344✔
206
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
17,904,344✔
207
  if (pMsgSendInfo == NULL) return pMsgSendInfo;
17,905,513✔
208
  pMsgSendInfo->requestObjRefId = pRequest->self;
17,905,513✔
209
  pMsgSendInfo->requestId = pRequest->requestId;
17,905,513✔
210
  pMsgSendInfo->param = pRequest;
17,905,824✔
211
  pMsgSendInfo->msgType = pRequest->type;
17,905,824✔
212
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
17,905,513✔
213

214
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
17,905,824✔
215
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
17,905,824✔
216
  return pMsgSendInfo;
17,906,278✔
217
}
218

219
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,428,436✔
220
  // todo rsp with the vnode id list
221
  SRequestObj* pRequest = param;
1,428,436✔
222
  taosMemoryFree(pMsg->pData);
1,428,436✔
223
  taosMemoryFree(pMsg->pEpSet);
1,428,436✔
224
  if (code != TSDB_CODE_SUCCESS) {
1,428,436✔
225
    setErrno(pRequest, code);
38,359✔
226
  } else {
227
    struct SCatalog* pCatalog = NULL;
1,390,077✔
228
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,390,077✔
229
    if (TSDB_CODE_SUCCESS == code) {
1,390,077✔
230
      STscObj* pTscObj = pRequest->pTscObj;
1,390,077✔
231

232
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,390,077✔
233
                               .requestId = pRequest->requestId,
1,390,077✔
234
                               .requestObjRefId = pRequest->self,
1,390,077✔
235
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,390,077✔
236
      char             dbFName[TSDB_DB_FNAME_LEN];
1,267,493✔
237
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,390,077✔
238
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,390,077✔
UNCOV
239
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
240
      }
241
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,390,077✔
242
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,390,077✔
UNCOV
243
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
244
      }
245
    }
246
  }
247

248
  if (pRequest->body.queryFp) {
1,428,436✔
249
    doRequestCallback(pRequest, code);
1,428,436✔
250
  } else {
UNCOV
251
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
252
      tscError("failed to post semaphore");
×
253
    }
254
  }
255
  return code;
1,428,436✔
256
}
257

258
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,929,670✔
259
  SRequestObj* pRequest = param;
2,929,670✔
260
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
2,929,670✔
261
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
262
    SUseDbRsp usedbRsp = {0};
1,848✔
263
    if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
1,848✔
264
      tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
1,848✔
265
    }
266
    struct SCatalog* pCatalog = NULL;
1,848✔
267

268
    if (usedbRsp.vgVersion >= 0) {  // cached in local
1,848✔
269
      int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
1,848✔
270
      int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
1,848✔
271
      if (code1 != TSDB_CODE_SUCCESS) {
1,848✔
UNCOV
272
        tscWarn("QID:0x%" PRIx64 ", catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
×
273
                tstrerror(code1));
274
      } else {
275
        if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0) {
1,848✔
UNCOV
276
          tscError("QID:0x%" PRIx64 ", catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
×
277
                   usedbRsp.uid);
278
        }
279
      }
280
    }
281
    tFreeSUsedbRsp(&usedbRsp);
1,848✔
282
  }
283

284
  if (code != TSDB_CODE_SUCCESS) {
2,929,750✔
285
    taosMemoryFree(pMsg->pData);
4,210✔
286
    taosMemoryFree(pMsg->pEpSet);
4,210✔
287
    setErrno(pRequest, code);
4,210✔
288

289
    if (pRequest->body.queryFp != NULL) {
4,210✔
290
      doRequestCallback(pRequest, pRequest->code);
4,210✔
291

292
    } else {
UNCOV
293
      if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
294
        tscError("failed to post semaphore");
×
295
      }
296
    }
297

298
    return code;
4,210✔
299
  }
300

301
  SUseDbRsp usedbRsp = {0};
2,925,540✔
302
  if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
2,925,540✔
UNCOV
303
    tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
×
304
  }
305

306
  if (strlen(usedbRsp.db) == 0) {
2,925,322✔
UNCOV
307
    taosMemoryFree(pMsg->pData);
×
308
    taosMemoryFree(pMsg->pEpSet);
×
309

UNCOV
310
    if (usedbRsp.errCode != 0) {
×
UNCOV
311
      return usedbRsp.errCode;
×
312
    } else {
313
      return TSDB_CODE_APP_ERROR;
×
314
    }
315
  }
316

317
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
2,925,322✔
318
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
5,414,925✔
319
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
2,489,547✔
320
    if (pInfo == NULL) {
2,489,547✔
UNCOV
321
      continue;
×
322
    }
323
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
2,489,547✔
324
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
5,198,138✔
325
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
2,708,591✔
326
    }
327
  }
328

329
  SName name = {0};
2,925,378✔
330
  if (tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
2,925,378✔
UNCOV
331
    tscError("QID:0x%" PRIx64 ", failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
×
332
  }
333

334
  SUseDbOutput output = {0};
2,925,540✔
335
  code = queryBuildUseDbOutput(&output, &usedbRsp);
2,925,540✔
336
  if (code != 0) {
2,925,540✔
UNCOV
337
    terrno = code;
×
UNCOV
338
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
×
339

UNCOV
340
    tscError("QID:0x%" PRIx64 ", failed to build use db output since %s", pRequest->requestId, terrstr());
×
341
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
2,925,540✔
342
    struct SCatalog* pCatalog = NULL;
1,156,439✔
343

344
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,156,439✔
345
    if (code1 != TSDB_CODE_SUCCESS) {
1,156,439✔
UNCOV
346
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
×
347
              tstrerror(code1));
348
    } else {
349
      if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0) {
1,156,439✔
UNCOV
350
        tscError("QID:0x%" PRIx64 ", failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
×
351
                 output.dbId);
352
      }
353
      output.dbVgroup = NULL;
1,156,439✔
354
    }
355
  }
356

357
  taosMemoryFreeClear(output.dbVgroup);
2,925,458✔
358
  tFreeSUsedbRsp(&usedbRsp);
2,925,540✔
359

360
  char db[TSDB_DB_NAME_LEN] = {0};
2,925,540✔
361
  if (tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
2,925,540✔
UNCOV
362
    tscError("QID:0x%" PRIx64 ", failed to get db name since %s", pRequest->requestId, tstrerror(code));
×
363
  }
364

365
  setConnectionDB(pRequest->pTscObj, db);
2,925,031✔
366

367
  taosMemoryFree(pMsg->pData);
2,925,540✔
368
  taosMemoryFree(pMsg->pEpSet);
2,925,540✔
369

370
  if (pRequest->body.queryFp != NULL) {
2,925,283✔
371
    doRequestCallback(pRequest, pRequest->code);
2,925,540✔
372
  } else {
UNCOV
373
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
374
      tscError("failed to post semaphore");
×
375
    }
376
  }
377
  return 0;
2,925,233✔
378
}
379

380
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,022,660✔
381
  if (pMsg == NULL) {
2,022,660✔
UNCOV
382
    tscError("processCreateSTableRsp: invalid input param, pMsg is NULL");
×
UNCOV
383
    return TSDB_CODE_TSC_INVALID_INPUT;
×
384
  }
385
  if (param == NULL) {
2,022,660✔
UNCOV
386
    taosMemoryFree(pMsg->pEpSet);
×
387
    taosMemoryFree(pMsg->pData);
×
388
    tscError("processCreateSTableRsp: invalid input param, param is NULL");
×
UNCOV
389
    return TSDB_CODE_TSC_INVALID_INPUT;
×
390
  }
391

392
  SRequestObj* pRequest = param;
2,022,660✔
393

394
  if (code != TSDB_CODE_SUCCESS) {
2,022,660✔
395
    setErrno(pRequest, code);
29,503✔
396
  } else {
397
    SMCreateStbRsp createRsp = {0};
1,993,157✔
398
    SDecoder       coder = {0};
1,993,157✔
399
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
1,993,157✔
400
    if (pMsg->len > 0) {
1,993,157✔
401
      code = tDecodeSMCreateStbRsp(&coder, &createRsp);  // pMsg->len == 0
1,980,410✔
402
      if (code != TSDB_CODE_SUCCESS) {
1,980,410✔
UNCOV
403
        setErrno(pRequest, code);
×
404
      }
405
    }
406
    tDecoderClear(&coder);
1,993,157✔
407

408
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
1,993,157✔
409
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
1,993,157✔
410
  }
411

412
  taosMemoryFree(pMsg->pEpSet);
2,022,660✔
413
  taosMemoryFree(pMsg->pData);
2,022,660✔
414

415
  if (pRequest->body.queryFp != NULL) {
2,022,660✔
416
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,639,417✔
417

418
    if (code == TSDB_CODE_SUCCESS) {
1,639,417✔
419
      SCatalog* pCatalog = NULL;
1,635,169✔
420
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,635,169✔
421
      if (pRes->res != NULL) {
1,635,169✔
422
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
1,627,506✔
423
      }
424

425
      if (ret != TSDB_CODE_SUCCESS) {
1,635,169✔
UNCOV
426
        code = ret;
×
427
      }
428
    }
429

430
    doRequestCallback(pRequest, code);
1,639,417✔
431
  } else {
432
    if (tsem_post(&pRequest->body.rspSem) != 0) {
383,243✔
UNCOV
433
      tscError("failed to post semaphore");
×
434
    }
435
  }
436
  return code;
2,022,660✔
437
}
438

439
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,273,082✔
440
  SRequestObj* pRequest = param;
1,273,082✔
441
  if (code != TSDB_CODE_SUCCESS) {
1,273,082✔
442
    setErrno(pRequest, code);
10,996✔
443
  } else {
444
    SDropDbRsp dropdbRsp = {0};
1,262,086✔
445
    if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0) {
1,262,086✔
UNCOV
446
      tscError("QID:0x%" PRIx64 ", deserialize SDropDbRsp failed", pRequest->requestId);
×
447
    }
448
    struct SCatalog* pCatalog = NULL;
1,262,086✔
449
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,262,086✔
450
    if (TSDB_CODE_SUCCESS == code) {
1,262,086✔
451
      if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0) {
1,262,086✔
UNCOV
452
        tscError("QID:0x%" PRIx64 ", failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
×
453
      }
454
      STscObj* pTscObj = pRequest->pTscObj;
1,262,086✔
455

456
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,262,086✔
457
                               .requestId = pRequest->requestId,
1,261,806✔
458
                               .requestObjRefId = pRequest->self,
1,261,806✔
459
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,262,086✔
460
      char             dbFName[TSDB_DB_FNAME_LEN] = {0};
1,262,086✔
461
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,261,806✔
462
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
1,262,086✔
UNCOV
463
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
464
      }
465
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,262,086✔
466
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,262,086✔
UNCOV
467
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
468
      }
469
    }
470
  }
471

472
  taosMemoryFree(pMsg->pData);
1,273,082✔
473
  taosMemoryFree(pMsg->pEpSet);
1,273,082✔
474

475
  if (pRequest->body.queryFp != NULL) {
1,273,082✔
476
    doRequestCallback(pRequest, code);
1,273,082✔
477
  } else {
UNCOV
478
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
479
      tscError("failed to post semaphore");
×
480
    }
481
  }
482
  return code;
1,273,082✔
483
}
484

485
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
5,947,455✔
486
  SRequestObj* pRequest = param;
5,947,455✔
487
  if (code != TSDB_CODE_SUCCESS) {
5,947,455✔
488
    setErrno(pRequest, code);
1,069,734✔
489
  } else {
490
    SMAlterStbRsp alterRsp = {0};
4,877,721✔
491
    SDecoder      coder = {0};
4,877,721✔
492
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
4,877,721✔
493
    if (pMsg->len > 0) {
4,877,721✔
494
      code = tDecodeSMAlterStbRsp(&coder, &alterRsp);  // pMsg->len == 0
4,855,354✔
495
      if (code != TSDB_CODE_SUCCESS) {
4,855,354✔
UNCOV
496
        setErrno(pRequest, code);
×
497
      }
498
    }
499
    tDecoderClear(&coder);
4,877,721✔
500

501
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
4,877,721✔
502
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
4,877,721✔
503
  }
504

505
  taosMemoryFree(pMsg->pData);
5,947,455✔
506
  taosMemoryFree(pMsg->pEpSet);
5,947,455✔
507

508
  if (pRequest->body.queryFp != NULL) {
5,947,455✔
509
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
5,947,455✔
510

511
    if (code == TSDB_CODE_SUCCESS) {
5,947,455✔
512
      SCatalog* pCatalog = NULL;
4,877,721✔
513
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
4,877,721✔
514
      if (pRes->res != NULL) {
4,877,721✔
515
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
4,855,354✔
516
      }
517

518
      if (ret != TSDB_CODE_SUCCESS) {
4,877,721✔
UNCOV
519
        code = ret;
×
520
      }
521
    }
522

523
    doRequestCallback(pRequest, code);
5,947,455✔
524
  } else {
UNCOV
525
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
526
      tscError("failed to post semaphore");
×
527
    }
528
  }
529
  return code;
5,947,455✔
530
}
531

532
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
42,582✔
533
  int32_t      code = 0;
42,582✔
534
  int32_t      line = 0;
42,582✔
535
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
42,582✔
536
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
42,582✔
537
  pBlock->info.hasVarCol = true;
42,582✔
538

539
  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
42,582✔
540
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
42,582✔
541
  SColumnInfoData infoData = {0};
42,582✔
542
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
42,582✔
543
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
42,582✔
544
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
85,164✔
545

546
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
42,582✔
547
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
42,582✔
548
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
85,164✔
549

550
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
42,582✔
551
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
42,582✔
552
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
85,164✔
553

554
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
42,582✔
555
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD4_LEN;
42,582✔
556
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
85,164✔
557

558
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
42,582✔
559
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD5_LEN;
42,582✔
560
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
85,164✔
561

562
  int32_t numOfCfg = taosArrayGetSize(pVars);
42,582✔
563
  code = blockDataEnsureCapacity(pBlock, numOfCfg);
42,582✔
564
  TSDB_CHECK_CODE(code, line, END);
42,582✔
565

566
  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
4,329,921✔
567
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
4,287,339✔
568
    TSDB_CHECK_NULL(pInfo, code, line, END, terrno);
4,287,339✔
569

570
    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
4,287,339✔
571
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
4,287,339✔
572
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,287,339✔
573
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,287,339✔
574
    code = colDataSetVal(pColInfo, i, name, false);
4,287,339✔
575
    TSDB_CHECK_CODE(code, line, END);
4,287,339✔
576

577
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
4,287,339✔
578
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
4,287,339✔
579
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,287,339✔
580
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,287,339✔
581
    code = colDataSetVal(pColInfo, i, value, false);
4,287,339✔
582
    TSDB_CHECK_CODE(code, line, END);
4,287,339✔
583

584
    char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
4,287,339✔
585
    STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
4,287,339✔
586
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,287,339✔
587
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,287,339✔
588
    code = colDataSetVal(pColInfo, i, scope, false);
4,287,339✔
589
    TSDB_CHECK_CODE(code, line, END);
4,287,339✔
590

591
    char category[TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE] = {0};
4,287,339✔
592
    STR_WITH_MAXSIZE_TO_VARSTR(category, pInfo->category, TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE);
4,287,339✔
593
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,287,339✔
594
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,287,339✔
595
    code = colDataSetVal(pColInfo, i, category, false);
4,287,339✔
596
    TSDB_CHECK_CODE(code, line, END);
4,287,339✔
597

598
    char info[TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE] = {0};
4,287,339✔
599
    STR_WITH_MAXSIZE_TO_VARSTR(info, pInfo->info, TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE);
4,287,339✔
600
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,287,339✔
601
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,287,339✔
602
    code = colDataSetVal(pColInfo, i, info, false);
4,287,339✔
603
    TSDB_CHECK_CODE(code, line, END);
4,287,339✔
604
  }
605

606
  pBlock->info.rows = numOfCfg;
42,582✔
607

608
  *block = pBlock;
42,582✔
609
  return code;
42,582✔
610

UNCOV
611
END:
×
UNCOV
612
  taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
613
  taosMemoryFree(pBlock);
×
UNCOV
614
  return code;
×
615
}
616

617
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
42,582✔
618
  SSDataBlock* pBlock = NULL;
42,582✔
619
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
42,582✔
620
  if (code) {
42,582✔
UNCOV
621
    return code;
×
622
  }
623

624
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
42,582✔
625
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
42,582✔
626
  *pRsp = taosMemoryCalloc(1, rspSize);
42,582✔
627
  if (NULL == *pRsp) {
42,582✔
UNCOV
628
    code = terrno;
×
UNCOV
629
    goto _exit;
×
630
  }
631

632
  (*pRsp)->useconds = 0;
42,582✔
633
  (*pRsp)->completed = 1;
42,582✔
634
  (*pRsp)->precision = 0;
42,582✔
635
  (*pRsp)->compressed = 0;
42,582✔
636

637
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
42,582✔
638
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
42,582✔
639

640
  int32_t len = 0;
42,582✔
641
  if ((*pRsp)->numOfRows > 0) {
42,582✔
642
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
41,814✔
643
    if (len < 0) {
41,814✔
UNCOV
644
      uError("buildShowVariablesRsp error, len:%d", len);
×
UNCOV
645
      code = terrno;
×
UNCOV
646
      goto _exit;
×
647
    }
648
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
41,814✔
649

650
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
41,814✔
651
    (*pRsp)->payloadLen = htonl(payloadLen);
41,814✔
652
    (*pRsp)->compLen = htonl(payloadLen);
41,814✔
653

654
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
41,814✔
UNCOV
655
      uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
656
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
657
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
658
      goto _exit;
×
659
    }
660
  }
661

662
  blockDataDestroy(pBlock);
42,582✔
663
  pBlock = NULL;
42,582✔
664

665
  return TSDB_CODE_SUCCESS;
42,582✔
UNCOV
666
_exit:
×
UNCOV
667
  if (*pRsp) {
×
UNCOV
668
    taosMemoryFree(*pRsp);
×
UNCOV
669
    *pRsp = NULL;
×
670
  }
671
  if (pBlock) {
×
672
    blockDataDestroy(pBlock);
×
673
    pBlock = NULL;
×
674
  }
UNCOV
675
  return code;
×
676
}
677

678
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
42,582✔
679
  SRequestObj* pRequest = param;
42,582✔
680
  if (code != TSDB_CODE_SUCCESS) {
42,582✔
UNCOV
681
    setErrno(pRequest, code);
×
682
  } else {
683
    SShowVariablesRsp  rsp = {0};
42,582✔
684
    SRetrieveTableRsp* pRes = NULL;
42,582✔
685
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
42,582✔
686
    if (TSDB_CODE_SUCCESS == code) {
42,582✔
687
      code = buildShowVariablesRsp(rsp.variables, &pRes);
42,582✔
688
    }
689
    if (TSDB_CODE_SUCCESS == code) {
42,582✔
690
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
42,582✔
691
    }
692

693
    if (code != 0) {
42,582✔
UNCOV
694
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
695
      taosMemoryFree(pRes);
×
696
    }
697
    tFreeSShowVariablesRsp(&rsp);
42,582✔
698
  }
699

700
  taosMemoryFree(pMsg->pData);
42,582✔
701
  taosMemoryFree(pMsg->pEpSet);
42,582✔
702

703
  if (pRequest->body.queryFp != NULL) {
42,582✔
704
    doRequestCallback(pRequest, code);
42,582✔
705
  } else {
UNCOV
706
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
707
      tscError("failed to post semaphore");
×
708
    }
709
  }
710
  return code;
42,582✔
711
}
712

713
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
59,280✔
714
  int32_t      code = 0;
59,280✔
715
  int32_t      line = 0;
59,280✔
716
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
59,280✔
717
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
59,280✔
718
  pBlock->info.hasVarCol = true;
59,280✔
719

720
  pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
59,280✔
721
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
59,280✔
722
  SColumnInfoData infoData = {0};
59,280✔
723
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
59,280✔
724
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
59,280✔
725
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
118,560✔
726

727
  infoData.info.type = TSDB_DATA_TYPE_INT;
59,280✔
728
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
59,280✔
729
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
118,560✔
730

731
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
59,280✔
732
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
59,280✔
733
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
118,560✔
734

735
  code = blockDataEnsureCapacity(pBlock, 1);
59,280✔
736
  TSDB_CHECK_CODE(code, line, END);
59,280✔
737

738
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
59,280✔
739
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
59,280✔
740
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
59,280✔
741
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
59,280✔
742
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
59,280✔
743
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
59,280✔
744

745
  char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
59,280✔
746
  char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
59,280✔
747
  if (pRsp->bAccepted) {
59,280✔
748
    STR_TO_VARSTR(result, "accepted");
59,280✔
749
    code = colDataSetVal(pResultCol, 0, result, false);
59,280✔
750
    TSDB_CHECK_CODE(code, line, END);
59,280✔
751
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
59,280✔
752
    TSDB_CHECK_CODE(code, line, END);
59,280✔
753
    STR_TO_VARSTR(reason, "success");
59,280✔
754
    code = colDataSetVal(pReasonCol, 0, reason, false);
59,280✔
755
    TSDB_CHECK_CODE(code, line, END);
59,280✔
756
  } else {
UNCOV
757
    STR_TO_VARSTR(result, "rejected");
×
UNCOV
758
    code = colDataSetVal(pResultCol, 0, result, false);
×
UNCOV
759
    TSDB_CHECK_CODE(code, line, END);
×
760
    colDataSetNULL(pIdCol, 0);
UNCOV
761
    STR_TO_VARSTR(reason, "compaction is ongoing");
×
762
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
763
    TSDB_CHECK_CODE(code, line, END);
×
764
  }
765
  pBlock->info.rows = 1;
59,280✔
766

767
  *block = pBlock;
59,280✔
768

769
  return TSDB_CODE_SUCCESS;
59,280✔
UNCOV
770
END:
×
UNCOV
771
  taosMemoryFree(pBlock);
×
UNCOV
772
  taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
773
  return code;
×
774
}
775

776
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
59,280✔
777
  SSDataBlock* pBlock = NULL;
59,280✔
778
  int32_t      code = buildCompactDbBlock(pCompactDb, &pBlock);
59,280✔
779
  if (code) {
59,280✔
UNCOV
780
    return code;
×
781
  }
782

783
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
59,280✔
784
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
59,280✔
785
  *pRsp = taosMemoryCalloc(1, rspSize);
59,280✔
786
  if (NULL == *pRsp) {
59,280✔
UNCOV
787
    code = terrno;
×
UNCOV
788
    goto _exit;
×
789
  }
790

791
  (*pRsp)->useconds = 0;
59,280✔
792
  (*pRsp)->completed = 1;
59,280✔
793
  (*pRsp)->precision = 0;
59,280✔
794
  (*pRsp)->compressed = 0;
59,280✔
795
  (*pRsp)->compLen = 0;
59,280✔
796
  (*pRsp)->payloadLen = 0;
59,280✔
797
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
59,280✔
798
  (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
59,280✔
799

800
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
59,280✔
801
  if (len < 0) {
59,280✔
UNCOV
802
    uError("buildRetriveTableRspForCompactDb error, len:%d", len);
×
UNCOV
803
    code = terrno;
×
UNCOV
804
    goto _exit;
×
805
  }
806
  blockDataDestroy(pBlock);
59,280✔
807

808
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
59,280✔
809

810
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
59,280✔
811
  (*pRsp)->payloadLen = htonl(payloadLen);
59,280✔
812
  (*pRsp)->compLen = htonl(payloadLen);
59,280✔
813

814
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
59,280✔
UNCOV
815
    uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
816
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
817
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
818
    goto _exit;
×
819
  }
820

821
  return TSDB_CODE_SUCCESS;
59,280✔
822
_exit:
×
823
  if (*pRsp) {
×
UNCOV
824
    taosMemoryFree(*pRsp);
×
UNCOV
825
    *pRsp = NULL;
×
826
  }
827
  if (pBlock) {
×
828
    blockDataDestroy(pBlock);
×
829
    pBlock = NULL;
×
830
  }
UNCOV
831
  return code;
×
832
}
833

834
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
33,107✔
835
  SRequestObj* pRequest = param;
33,107✔
836
  if (code != TSDB_CODE_SUCCESS) {
33,107✔
837
    setErrno(pRequest, code);
2,562✔
838
  } else {
839
    SCompactDbRsp      rsp = {0};
30,545✔
840
    SRetrieveTableRsp* pRes = NULL;
30,545✔
841
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
30,545✔
842
    if (TSDB_CODE_SUCCESS == code) {
30,545✔
843
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
30,545✔
844
    }
845
    if (TSDB_CODE_SUCCESS == code) {
30,545✔
846
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
30,545✔
847
    }
848

849
    if (code != 0) {
30,545✔
UNCOV
850
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
851
      taosMemoryFree(pRes);
×
852
    }
853
  }
854

855
  taosMemoryFree(pMsg->pData);
33,107✔
856
  taosMemoryFree(pMsg->pEpSet);
33,107✔
857

858
  if (pRequest->body.queryFp != NULL) {
33,107✔
859
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
33,107✔
860
  } else {
UNCOV
861
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
862
      tscError("failed to post semaphore");
×
863
    }
864
  }
865
  return code;
33,107✔
866
}
867

868
static int32_t buildScanDbBlock(SScanDbRsp* pRsp, SSDataBlock** block) {
454✔
869
  int32_t      code = 0;
454✔
870
  int32_t      line = 0;
454✔
871
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
454✔
872
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
454✔
873
  pBlock->info.hasVarCol = true;
454✔
874

875
  pBlock->pDataBlock = taosArrayInit(SCAN_DB_RESULT_COLS, sizeof(SColumnInfoData));
454✔
876
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
454✔
877
  SColumnInfoData infoData = {0};
454✔
878
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
454✔
879
  infoData.info.bytes = SCAN_DB_RESULT_FIELD1_LEN;
454✔
880
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
908✔
881

882
  infoData.info.type = TSDB_DATA_TYPE_INT;
454✔
883
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
454✔
884
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
908✔
885

886
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
454✔
887
  infoData.info.bytes = SCAN_DB_RESULT_FIELD3_LEN;
454✔
888
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
908✔
889

890
  code = blockDataEnsureCapacity(pBlock, 1);
454✔
891
  TSDB_CHECK_CODE(code, line, END);
454✔
892

893
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
454✔
894
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
454✔
895
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
454✔
896
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
454✔
897
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
454✔
898
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
454✔
899

900
  char result[SCAN_DB_RESULT_FIELD1_LEN] = {0};
454✔
901
  char reason[SCAN_DB_RESULT_FIELD3_LEN] = {0};
454✔
902
  if (pRsp->bAccepted) {
454✔
903
    STR_TO_VARSTR(result, "accepted");
454✔
904
    code = colDataSetVal(pResultCol, 0, result, false);
454✔
905
    TSDB_CHECK_CODE(code, line, END);
454✔
906
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->scanId, false);
454✔
907
    TSDB_CHECK_CODE(code, line, END);
454✔
908
    STR_TO_VARSTR(reason, "success");
454✔
909
    code = colDataSetVal(pReasonCol, 0, reason, false);
454✔
910
    TSDB_CHECK_CODE(code, line, END);
454✔
911
  } else {
UNCOV
912
    STR_TO_VARSTR(result, "rejected");
×
UNCOV
913
    code = colDataSetVal(pResultCol, 0, result, false);
×
UNCOV
914
    TSDB_CHECK_CODE(code, line, END);
×
915
    colDataSetNULL(pIdCol, 0);
UNCOV
916
    STR_TO_VARSTR(reason, "scan is ongoing");
×
917
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
918
    TSDB_CHECK_CODE(code, line, END);
×
919
  }
920
  pBlock->info.rows = 1;
454✔
921

922
  *block = pBlock;
454✔
923

924
  return TSDB_CODE_SUCCESS;
454✔
UNCOV
925
END:
×
UNCOV
926
  taosMemoryFree(pBlock);
×
UNCOV
927
  taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
928
  return code;
×
929
}
930

931
static int32_t buildRetriveTableRspForScanDb(SScanDbRsp* pScanDb, SRetrieveTableRsp** pRsp) {
454✔
932
  SSDataBlock* pBlock = NULL;
454✔
933
  int32_t      code = buildScanDbBlock(pScanDb, &pBlock);
454✔
934
  if (code) {
454✔
UNCOV
935
    return code;
×
936
  }
937

938
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
454✔
939
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
454✔
940
  *pRsp = taosMemoryCalloc(1, rspSize);
454✔
941
  if (NULL == *pRsp) {
454✔
UNCOV
942
    code = terrno;
×
UNCOV
943
    goto _exit;
×
944
  }
945

946
  (*pRsp)->useconds = 0;
454✔
947
  (*pRsp)->completed = 1;
454✔
948
  (*pRsp)->precision = 0;
454✔
949
  (*pRsp)->compressed = 0;
454✔
950
  (*pRsp)->compLen = 0;
454✔
951
  (*pRsp)->payloadLen = 0;
454✔
952
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
454✔
953
  (*pRsp)->numOfCols = htonl(SCAN_DB_RESULT_COLS);
454✔
954

955
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SCAN_DB_RESULT_COLS);
454✔
956
  if (len < 0) {
454✔
UNCOV
957
    uError("%s error, len:%d", __func__, len);
×
UNCOV
958
    code = terrno;
×
UNCOV
959
    goto _exit;
×
960
  }
961
  blockDataDestroy(pBlock);
454✔
962

963
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
454✔
964

965
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
454✔
966
  (*pRsp)->payloadLen = htonl(payloadLen);
454✔
967
  (*pRsp)->compLen = htonl(payloadLen);
454✔
968

969
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
454✔
UNCOV
970
    uError("%s error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, __func__, len,
×
971
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
972
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
973
    goto _exit;
×
974
  }
975

976
  return TSDB_CODE_SUCCESS;
454✔
977
_exit:
×
978
  if (*pRsp) {
×
UNCOV
979
    taosMemoryFree(*pRsp);
×
UNCOV
980
    *pRsp = NULL;
×
981
  }
982
  if (pBlock) {
×
983
    blockDataDestroy(pBlock);
×
984
    pBlock = NULL;
×
985
  }
UNCOV
986
  return code;
×
987
}
988

989
static int32_t processScanDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
678✔
990
  SRequestObj* pRequest = param;
678✔
991
  if (code != TSDB_CODE_SUCCESS) {
678✔
992
    setErrno(pRequest, code);
224✔
993
  } else {
994
    SScanDbRsp         rsp = {0};
454✔
995
    SRetrieveTableRsp* pRes = NULL;
454✔
996
    code = tDeserializeSScanDbRsp(pMsg->pData, pMsg->len, &rsp);
454✔
997
    if (TSDB_CODE_SUCCESS == code) {
454✔
998
      code = buildRetriveTableRspForScanDb(&rsp, &pRes);
454✔
999
    }
1000
    if (TSDB_CODE_SUCCESS == code) {
454✔
1001
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
454✔
1002
    }
1003

1004
    if (code != 0) {
454✔
UNCOV
1005
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
1006
      taosMemoryFree(pRes);
×
1007
    }
1008
  }
1009

1010
  taosMemoryFree(pMsg->pData);
678✔
1011
  taosMemoryFree(pMsg->pEpSet);
678✔
1012

1013
  if (pRequest->body.queryFp != NULL) {
678✔
1014
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
678✔
1015
  } else {
UNCOV
1016
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
1017
      tscError("failed to post semaphore");
×
1018
    }
1019
  }
1020
  return code;
678✔
1021
}
1022

1023
int32_t processTrimDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
42,631✔
1024
  SRequestObj* pRequest = param;
42,631✔
1025
  if (code != TSDB_CODE_SUCCESS) {
42,631✔
1026
    setErrno(pRequest, code);
13,896✔
1027
  } else {
1028
    STrimDbRsp         rsp = {0};
28,735✔
1029
    SRetrieveTableRsp* pRes = NULL;
28,735✔
1030
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, (SCompactDbRsp*)&rsp);
28,735✔
1031
    if (TSDB_CODE_SUCCESS == code) {
28,735✔
1032
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
28,735✔
1033
    }
1034
    if (TSDB_CODE_SUCCESS == code) {
28,735✔
1035
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
28,735✔
1036
    }
1037

1038
    if (code != 0) {
28,735✔
UNCOV
1039
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
1040
      taosMemoryFree(pRes);
×
1041
    }
1042
  }
1043

1044
  taosMemoryFree(pMsg->pData);
42,631✔
1045
  taosMemoryFree(pMsg->pEpSet);
42,631✔
1046

1047
  if (pRequest->body.queryFp != NULL) {
42,631✔
1048
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
42,631✔
1049
  } else {
UNCOV
1050
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
1051
      tscError("failed to post semaphore");
×
1052
    }
1053
  }
1054
  return code;
42,631✔
1055
}
1056

1057
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
21,428,009✔
1058
  switch (msgType) {
21,428,009✔
1059
    case TDMT_MND_CONNECT:
3,523,250✔
1060
      return processConnectRsp;
3,523,250✔
1061
    case TDMT_MND_CREATE_DB:
1,428,436✔
1062
      return processCreateDbRsp;
1,428,436✔
1063
    case TDMT_MND_USE_DB:
2,929,019✔
1064
      return processUseDbRsp;
2,929,019✔
1065
    case TDMT_MND_CREATE_STB:
2,022,660✔
1066
      return processCreateSTableRsp;
2,022,660✔
1067
    case TDMT_MND_DROP_DB:
1,273,082✔
1068
      return processDropDbRsp;
1,273,082✔
1069
    case TDMT_MND_ALTER_STB:
5,947,455✔
1070
      return processAlterStbRsp;
5,947,455✔
1071
    case TDMT_MND_SHOW_VARIABLES:
42,582✔
1072
      return processShowVariablesRsp;
42,582✔
1073
    case TDMT_MND_COMPACT_DB:
33,107✔
1074
      return processCompactDbRsp;
33,107✔
1075
    case TDMT_MND_TRIM_DB:
42,631✔
1076
      return processTrimDbRsp;
42,631✔
1077
    case TDMT_MND_SCAN_DB:
678✔
1078
      return processScanDbRsp;
678✔
1079
    default:
4,185,109✔
1080
      return genericRspCallback;
4,185,109✔
1081
  }
1082
}
1083

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc