• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5034

24 Apr 2026 11:25AM UTC coverage: 73.058%. Remained the same
#5034

push

travis-ci

web-flow
merge: from main to 3.0 branch #35224

merge: from main to 3.0 branch[manual-only]

1336 of 1975 new or added lines in 48 files covered. (67.65%)

14149 existing lines in 164 files now uncovered.

275896 of 377640 relevant lines covered (73.06%)

132944440.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.61
/source/client/src/clientMsgHandler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "cmdnodes.h"
22
#include "command.h"
23
#include "os.h"
24
#include "query.h"
25
#include "systable.h"
26
#include "tdatablock.h"
27
#include "tdef.h"
28
#include "tglobal.h"
29
#include "tmsg.h"
30
#include "tname.h"
31
#include "tversion.h"
32

33
extern SClientHbMgr clientHbMgr;
34

35
static void setErrno(SRequestObj* pRequest, int32_t code) {
5,752,694✔
36
  pRequest->code = code;
5,752,694✔
37
  terrno = code;
5,752,694✔
38
}
5,752,681✔
39

40
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
4,579,687✔
41
  SRequestObj* pRequest = param;
4,579,687✔
42
  setErrno(pRequest, code);
4,579,687✔
43

44
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
4,579,687✔
45
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
1,314,918✔
46
      tscError("failed to remove meta data for table");
×
47
    }
48
  }
49

50
  // Preserve MNode custom error detail string (e.g. MAC preflight user list)
51
  if (code != TSDB_CODE_SUCCESS && pMsg->pData != NULL && pMsg->len > 0) {
4,579,687✔
52
    if (pMsg->len <= pRequest->msgBufLen) {
1,148✔
53
      int32_t copyLen = TMIN((int32_t)pMsg->len, pRequest->msgBufLen - 1);
1,148✔
54
      memcpy(pRequest->msgBuf, (char*)pMsg->pData, copyLen);
1,148✔
55
      pRequest->msgBuf[copyLen] = '\0';
1,148✔
56
    } else {
NEW
57
      taosMemoryFreeClear(pRequest->msgBuf);
×
NEW
58
      pRequest->msgBuf = pMsg->pData;
×
NEW
59
      pMsg->pData = NULL;
×
NEW
60
      pRequest->msgBufLen = pMsg->len;
×
61
    }
62
  }
63

64
  taosMemoryFree(pMsg->pEpSet);
4,579,687✔
65
  taosMemoryFree(pMsg->pData);
4,579,687✔
66
  if (pRequest->body.queryFp != NULL) {
4,579,687✔
67
    doRequestCallback(pRequest, code);
4,577,779✔
68
  } else {
69
    if (tsem_post(&pRequest->body.rspSem) != 0) {
1,908✔
UNCOV
70
      tscError("failed to post semaphore");
×
71
    }
72
  }
73
  return code;
4,579,687✔
74
}
75

76
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
94,461,296✔
77
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
94,461,296✔
78
  if (NULL == pRequest) {
94,467,345✔
UNCOV
79
    goto EXIT;
×
80
  }
81

82
  if (code != TSDB_CODE_SUCCESS) {
94,467,345✔
83
    goto End;
16,028✔
84
  }
85

86
  STscObj* pTscObj = pRequest->pTscObj;
94,451,317✔
87

88
  if (NULL == pTscObj->pAppInfo) {
94,451,317✔
UNCOV
89
    code = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
90
    goto End;
×
91
  }
92

93
  if (pTscObj->connType == CONN_TYPE__AUTH_TEST) {
94,451,317✔
94
    // auth test connection, no need to process connect rsp
UNCOV
95
    goto End;
×
96
  }
97

98
  SConnectRsp connectRsp = {0};
94,451,317✔
99
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
94,451,317✔
UNCOV
100
    code = TSDB_CODE_TSC_INVALID_VERSION;
×
UNCOV
101
    goto End;
×
102
  }
103

104
  if ((code = taosCheckVersionCompatibleFromStr(td_version, connectRsp.sVer, 3)) != 0) {
94,442,077✔
UNCOV
105
    tscError("version not compatible. client version:%s, server version:%s", td_version, connectRsp.sVer);
×
UNCOV
106
    goto End;
×
107
  }
108

109
  int32_t now = taosGetTimestampSec();
94,418,923✔
110
  int32_t delta = abs(now - connectRsp.svrTimestamp);
94,430,520✔
111
  if (delta > tsTimestampDeltaLimit) {
94,430,520✔
112
    code = TSDB_CODE_TIME_UNSYNCED;
15,368✔
113
    tscError("time diff:%ds is too big", delta);
15,368✔
114
    goto End;
×
115
  }
116

117
  if (connectRsp.epSet.numOfEps == 0) {
94,415,187✔
UNCOV
118
    code = TSDB_CODE_APP_ERROR;
×
119
    goto End;
×
120
  }
121

122
  int    updateEpSet = 1;
94,415,187✔
123
  SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
94,415,187✔
124
  if (connectRsp.dnodeNum == 1) {
94,417,635✔
125
    SEpSet dstEpSet = connectRsp.epSet;
94,163,919✔
126
    if (srcEpSet.numOfEps == 1) {
94,163,919✔
127
      if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
92,936,733✔
128
                            dstEpSet.eps[dstEpSet.inUse].fqdn) != 0) {
92,963,833✔
UNCOV
129
        tscError("failed to set default addr for rpc");
×
130
      }
131
      updateEpSet = 0;
92,938,952✔
132
    }
133
  }
134
  if (updateEpSet == 1 && !isEpsetEqual(&srcEpSet, &connectRsp.epSet)) {
94,392,929✔
135
    SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,270,446✔
136

137
    SEpSet* pOrig = &corEpSet;
1,229,163✔
138
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
1,229,163✔
139
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
1,229,163✔
140
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
1,229,163✔
141
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
142
             pNewEp->port);
143
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
1,229,163✔
144
  }
145

146
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
188,736,130✔
147
    tscDebug("QID:0x%" PRIx64 ", epSet.fqdn[%d]:%s port:%d, conn:0x%" PRIx64, pRequest->requestId, i,
94,341,761✔
148
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
149
  }
150

151
  pTscObj->sysInfo = connectRsp.sysInfo;
94,394,369✔
152
  pTscObj->minSecLevel = connectRsp.minSecLevel;
94,394,438✔
153
  pTscObj->maxSecLevel = connectRsp.maxSecLevel;
94,393,587✔
154
  pTscObj->connId = connectRsp.connId;
94,394,008✔
155
  pTscObj->acctId = connectRsp.acctId;
94,394,416✔
156
  if (pTscObj->user[0] == 0) {
94,394,659✔
157
    tstrncpy(pTscObj->user, connectRsp.user, tListLen(pTscObj->user));
1,506✔
158
    tstrncpy(pTscObj->tokenName, connectRsp.tokenName, tListLen(pTscObj->tokenName));
1,506✔
159
  } else {
160
    pTscObj->tokenName[0] = 0;
94,393,110✔
161
  }
162
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
94,394,409✔
163
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
94,394,505✔
164

165
  // update the appInstInfo
166
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
94,394,763✔
167
  pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
94,393,884✔
168
  pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
94,394,567✔
169
  pTscObj->pAppInfo->serverCfg.enableAuditSelect = connectRsp.enableAuditSelect;
94,394,134✔
170
  pTscObj->pAppInfo->serverCfg.enableAuditInsert = connectRsp.enableAuditInsert;
94,394,256✔
171
  pTscObj->pAppInfo->serverCfg.auditLevel = connectRsp.auditLevel;
94,393,745✔
172
  pTscObj->pAppInfo->serverCfg.sodInitial = connectRsp.sodInitial;
94,394,094✔
173
  pTscObj->pAppInfo->serverCfg.macActive = connectRsp.macActive;
94,393,601✔
174
  tscDebug("monitor paras from connect rsp, clusterId:0x%" PRIx64 ", threshold:%d scope:%d",
94,394,090✔
175
           connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
176
  lastClusterId = connectRsp.clusterId;
94,394,090✔
177

178
  pTscObj->connType = connectRsp.connType;
94,394,090✔
179
  pTscObj->passInfo.ver = connectRsp.passVer;
94,392,454✔
180
  pTscObj->authVer = connectRsp.authVer;
94,394,338✔
181
  pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
94,393,978✔
182
  pTscObj->userId = connectRsp.userId;
94,394,069✔
183

184
  if (taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL) {
94,393,046✔
185
    if (taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo,
1,552,684✔
186
                    POINTER_BYTES) != 0) {
UNCOV
187
      tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
×
188
    } else {
189
#ifdef USE_MONITOR
190
      MonitorSlowLogData data = {0};
1,552,684✔
191
      data.clusterId = pTscObj->pAppInfo->clusterId;
1,552,684✔
192
      data.type = SLOW_LOG_READ_ALL;
1,552,684✔
193
      (void)monitorPutData2MonitorQueue(data);  // ignore return code
1,552,684✔
194
      monitorClientSlowQueryInit(connectRsp.clusterId);
1,552,684✔
195
      monitorClientSQLReqInit(connectRsp.clusterId);
1,552,684✔
196
#endif
197
    }
198
  }
199

200
  code = tscRefSessMetric(pTscObj);
94,403,513✔
201
  if (TSDB_CODE_SUCCESS != code) {
94,450,873✔
UNCOV
202
    tscError("failed to connect with user:%s, code:%s", pTscObj->user, tstrerror(code));
×
UNCOV
203
    goto End;
×
204
  }
205

206
  sessMetricRef(pTscObj->pSessMetric);
94,450,873✔
207

208
  SSessParam pPara = {.type = SESSION_PER_USER, .value = 1};
94,451,141✔
209
  code = tscUpdateSessMetric(pTscObj, &pPara);
94,451,141✔
210
  if (TSDB_CODE_SUCCESS != code) {
94,449,063✔
UNCOV
211
    tscError("failed to connect with user:%s, code:%s", pTscObj->user, tstrerror(code));
×
UNCOV
212
    goto End;
×
213
  }
214

215
  (void)taosThreadMutexLock(&clientHbMgr.lock);
94,449,063✔
216
  SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
94,451,407✔
217
  if (pAppHbMgr) {
94,451,407✔
218
    if (hbRegisterConn(pAppHbMgr, pTscObj->id, pTscObj->user, pTscObj->tokenName, connectRsp.clusterId, connectRsp.connType) != 0) {
94,451,407✔
UNCOV
219
      tscError("QID:0x%" PRIx64 ", failed to register conn to hbMgr", pRequest->requestId);
×
220
    }
221
  } else {
UNCOV
222
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
UNCOV
223
    code = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
224
    goto End;
×
225
  }
226
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
94,451,407✔
227

228
  tscDebug("QID:0x%" PRIx64 ", clusterId:0x%" PRIx64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
94,451,070✔
229
           pTscObj->pAppInfo->numOfConns);
230

231
End:
94,445,421✔
232
  if (code != 0) {
94,467,098✔
233
    setErrno(pRequest, code);
16,028✔
234
  }
235
  if (tsem_post(&pRequest->body.rspSem) != 0) {
94,467,098✔
UNCOV
236
    tscError("failed to post semaphore");
×
237
  }
238

239
  if (pRequest) {
94,465,305✔
240
    (void)releaseRequest(pRequest->self);
94,465,305✔
241
  }
242

243
EXIT:
3,052,821✔
244
  taosMemoryFree(param);
94,467,285✔
245
  taosMemoryFree(pMsg->pEpSet);
94,466,329✔
246
  taosMemoryFree(pMsg->pData);
94,466,360✔
247
  return code;
94,465,671✔
248
}
249

250
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
109,912,740✔
251
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
109,912,740✔
252
  if (pMsgSendInfo == NULL) return pMsgSendInfo;
109,914,931✔
253
  pMsgSendInfo->requestObjRefId = pRequest->self;
109,914,931✔
254
  pMsgSendInfo->requestId = pRequest->requestId;
109,914,931✔
255
  pMsgSendInfo->param = pRequest;
109,915,720✔
256
  pMsgSendInfo->msgType = pRequest->type;
109,915,720✔
257
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
109,915,416✔
258

259
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
109,915,680✔
260
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
109,914,931✔
261
  return pMsgSendInfo;
109,915,226✔
262
}
263

264
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,590,888✔
265
  // todo rsp with the vnode id list
266
  SRequestObj* pRequest = param;
1,590,888✔
267
  taosMemoryFree(pMsg->pData);
1,590,888✔
268
  taosMemoryFree(pMsg->pEpSet);
1,590,888✔
269
  if (code != TSDB_CODE_SUCCESS) {
1,590,888✔
270
    setErrno(pRequest, code);
30,757✔
271
  } else {
272
    struct SCatalog* pCatalog = NULL;
1,560,131✔
273
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,560,131✔
274
    if (TSDB_CODE_SUCCESS == code) {
1,560,131✔
275
      STscObj* pTscObj = pRequest->pTscObj;
1,560,131✔
276

277
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,560,131✔
278
                               .requestId = pRequest->requestId,
1,560,131✔
279
                               .requestObjRefId = pRequest->self,
1,560,131✔
280
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,560,131✔
281
      char             dbFName[TSDB_DB_FNAME_LEN];
1,526,719✔
282
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,560,131✔
283
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,560,131✔
UNCOV
284
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
285
      }
286
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,560,131✔
287
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,560,131✔
UNCOV
288
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
289
      }
290
    }
291
  }
292

293
  if (pRequest->body.queryFp) {
1,590,888✔
294
    doRequestCallback(pRequest, code);
1,590,888✔
295
  } else {
UNCOV
296
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
297
      tscError("failed to post semaphore");
×
298
    }
299
  }
300
  return code;
1,590,888✔
301
}
302

303
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
94,115,550✔
304
  SRequestObj* pRequest = param;
94,115,550✔
305
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
94,115,550✔
306
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
307
    SUseDbRsp usedbRsp = {0};
1,777✔
308
    if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
1,777✔
309
      tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
1,777✔
310
    }
311
    struct SCatalog* pCatalog = NULL;
1,777✔
312

313
    if (usedbRsp.vgVersion >= 0) {  // cached in local
1,777✔
314
      int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
1,777✔
315
      int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
1,777✔
316
      if (code1 != TSDB_CODE_SUCCESS) {
1,777✔
UNCOV
317
        tscWarn("QID:0x%" PRIx64 ", catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
×
318
                tstrerror(code1));
319
      } else {
320
        if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0) {
1,777✔
UNCOV
321
          tscError("QID:0x%" PRIx64 ", catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
×
322
                   usedbRsp.uid);
323
        }
324
      }
325
    }
326
    tFreeSUsedbRsp(&usedbRsp);
1,777✔
327
  }
328

329
  if (code != TSDB_CODE_SUCCESS) {
94,119,786✔
330
    taosMemoryFree(pMsg->pData);
2,081✔
331
    taosMemoryFree(pMsg->pEpSet);
2,081✔
332
    setErrno(pRequest, code);
2,081✔
333

334
    if (pRequest->body.queryFp != NULL) {
2,081✔
335
      doRequestCallback(pRequest, pRequest->code);
2,081✔
336

337
    } else {
UNCOV
338
      if (tsem_post(&pRequest->body.rspSem) != 0) {
×
339
        tscError("failed to post semaphore");
×
340
      }
341
    }
342

343
    return code;
2,081✔
344
  }
345

346
  SUseDbRsp usedbRsp = {0};
94,117,705✔
347
  if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
94,117,705✔
UNCOV
348
    tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
×
349
  }
350

351
  if (strlen(usedbRsp.db) == 0) {
94,109,266✔
UNCOV
352
    taosMemoryFree(pMsg->pData);
×
UNCOV
353
    taosMemoryFree(pMsg->pEpSet);
×
354

UNCOV
355
    if (usedbRsp.errCode != 0) {
×
356
      return usedbRsp.errCode;
×
357
    } else {
UNCOV
358
      return TSDB_CODE_APP_ERROR;
×
359
    }
360
  }
361

362
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
94,109,266✔
363
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
96,930,735✔
364
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
2,829,763✔
365
    if (pInfo == NULL) {
2,829,763✔
366
      continue;
×
367
    }
368
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
2,829,763✔
369
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
5,837,990✔
370
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
3,008,227✔
371
    }
372
  }
373

374
  SName name = {0};
94,100,972✔
375
  if (tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
94,100,972✔
376
    tscError("QID:0x%" PRIx64 ", failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
×
377
  }
378

379
  SUseDbOutput output = {0};
94,087,661✔
380
  code = queryBuildUseDbOutput(&output, &usedbRsp);
94,087,661✔
381
  if (code != 0) {
94,099,881✔
UNCOV
382
    terrno = code;
×
UNCOV
383
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
×
384

UNCOV
385
    tscError("QID:0x%" PRIx64 ", failed to build use db output since %s", pRequest->requestId, terrstr());
×
386
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
94,099,881✔
387
    struct SCatalog* pCatalog = NULL;
1,327,543✔
388

389
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,327,543✔
390
    if (code1 != TSDB_CODE_SUCCESS) {
1,327,543✔
UNCOV
391
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
×
392
              tstrerror(code1));
393
    } else {
394
      if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0) {
1,327,543✔
UNCOV
395
        tscError("QID:0x%" PRIx64 ", failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
×
396
                 output.dbId);
397
      }
398
      output.dbVgroup = NULL;
1,327,543✔
399
    }
400
  }
401

402
  taosMemoryFreeClear(output.dbVgroup);
94,077,535✔
403
  tFreeSUsedbRsp(&usedbRsp);
94,126,115✔
404

405
  char db[TSDB_DB_NAME_LEN] = {0};
94,080,791✔
406
  if (tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
94,080,791✔
UNCOV
407
    tscError("QID:0x%" PRIx64 ", failed to get db name since %s", pRequest->requestId, tstrerror(code));
×
408
  }
409

410
  setConnectionDB(pRequest->pTscObj, db);
94,073,533✔
411

412
  taosMemoryFree(pMsg->pData);
94,130,402✔
413
  taosMemoryFree(pMsg->pEpSet);
94,110,883✔
414

415
  if (pRequest->body.queryFp != NULL) {
94,126,517✔
416
    doRequestCallback(pRequest, pRequest->code);
94,127,087✔
417
  } else {
UNCOV
418
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
419
      tscError("failed to post semaphore");
×
420
    }
421
  }
422
  return 0;
94,113,316✔
423
}
424

425
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,127,989✔
426
  if (pMsg == NULL) {
2,127,989✔
UNCOV
427
    tscError("processCreateSTableRsp: invalid input param, pMsg is NULL");
×
UNCOV
428
    return TSDB_CODE_TSC_INVALID_INPUT;
×
429
  }
430
  if (param == NULL) {
2,127,989✔
UNCOV
431
    taosMemoryFree(pMsg->pEpSet);
×
UNCOV
432
    taosMemoryFree(pMsg->pData);
×
UNCOV
433
    tscError("processCreateSTableRsp: invalid input param, param is NULL");
×
UNCOV
434
    return TSDB_CODE_TSC_INVALID_INPUT;
×
435
  }
436

437
  SRequestObj* pRequest = param;
2,127,989✔
438

439
  if (code != TSDB_CODE_SUCCESS) {
2,127,989✔
440
    setErrno(pRequest, code);
10,115✔
441
  } else {
442
    SMCreateStbRsp createRsp = {0};
2,117,874✔
443
    SDecoder       coder = {0};
2,117,874✔
444
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
2,117,874✔
445
    if (pMsg->len > 0) {
2,117,874✔
446
      code = tDecodeSMCreateStbRsp(&coder, &createRsp);  // pMsg->len == 0
2,096,853✔
447
      if (code != TSDB_CODE_SUCCESS) {
2,096,853✔
UNCOV
448
        setErrno(pRequest, code);
×
449
      }
450
    }
451
    tDecoderClear(&coder);
2,117,874✔
452

453
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
2,117,874✔
454
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
2,117,874✔
455
  }
456

457
  taosMemoryFree(pMsg->pEpSet);
2,127,976✔
458
  taosMemoryFree(pMsg->pData);
2,127,989✔
459

460
  if (pRequest->body.queryFp != NULL) {
2,127,989✔
461
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,697,253✔
462

463
    if (code == TSDB_CODE_SUCCESS) {
1,697,253✔
464
      SCatalog* pCatalog = NULL;
1,691,334✔
465
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,691,334✔
466
      if (pRes->res != NULL) {
1,691,334✔
467
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
1,688,380✔
468
      }
469

470
      if (ret != TSDB_CODE_SUCCESS) {
1,691,334✔
UNCOV
471
        code = ret;
×
472
      }
473
    }
474

475
    doRequestCallback(pRequest, code);
1,697,253✔
476
  } else {
477
    if (tsem_post(&pRequest->body.rspSem) != 0) {
430,736✔
UNCOV
478
      tscError("failed to post semaphore");
×
479
    }
480
  }
481
  return code;
2,127,989✔
482
}
483

484
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,405,439✔
485
  SRequestObj* pRequest = param;
1,405,439✔
486
  if (code != TSDB_CODE_SUCCESS) {
1,405,439✔
487
    setErrno(pRequest, code);
6,428✔
488
  } else {
489
    SDropDbRsp dropdbRsp = {0};
1,399,011✔
490
    if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0) {
1,399,011✔
UNCOV
491
      tscError("QID:0x%" PRIx64 ", deserialize SDropDbRsp failed", pRequest->requestId);
×
492
    }
493
    struct SCatalog* pCatalog = NULL;
1,399,011✔
494
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,399,011✔
495
    if (TSDB_CODE_SUCCESS == code) {
1,399,011✔
496
      if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0) {
1,399,011✔
UNCOV
497
        tscError("QID:0x%" PRIx64 ", failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
×
498
      }
499
      STscObj* pTscObj = pRequest->pTscObj;
1,399,011✔
500

501
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,399,011✔
502
                               .requestId = pRequest->requestId,
1,399,011✔
503
                               .requestObjRefId = pRequest->self,
1,399,011✔
504
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,399,011✔
505
      char             dbFName[TSDB_DB_FNAME_LEN] = {0};
1,399,011✔
506
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,399,011✔
507
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
1,399,011✔
UNCOV
508
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
509
      }
510
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,399,011✔
511
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,399,011✔
UNCOV
512
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
513
      }
514
    }
515
  }
516

517
  taosMemoryFree(pMsg->pData);
1,405,439✔
518
  taosMemoryFree(pMsg->pEpSet);
1,405,439✔
519

520
  if (pRequest->body.queryFp != NULL) {
1,405,439✔
521
    doRequestCallback(pRequest, code);
1,405,439✔
522
  } else {
UNCOV
523
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
524
      tscError("failed to post semaphore");
×
525
    }
526
  }
527
  return code;
1,405,439✔
528
}
529

530
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
5,974,961✔
531
  SRequestObj* pRequest = param;
5,974,961✔
532
  if (code != TSDB_CODE_SUCCESS) {
5,974,961✔
533
    setErrno(pRequest, code);
1,089,783✔
534
  } else {
535
    SMAlterStbRsp alterRsp = {0};
4,885,178✔
536
    SDecoder      coder = {0};
4,885,178✔
537
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
4,885,178✔
538
    if (pMsg->len > 0) {
4,885,178✔
539
      code = tDecodeSMAlterStbRsp(&coder, &alterRsp);  // pMsg->len == 0
4,857,464✔
540
      if (code != TSDB_CODE_SUCCESS) {
4,857,464✔
541
        setErrno(pRequest, code);
×
542
      }
543
    }
544
    tDecoderClear(&coder);
4,885,178✔
545

546
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
4,885,178✔
547
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
4,885,178✔
548
  }
549

550
  taosMemoryFree(pMsg->pData);
5,974,961✔
551
  taosMemoryFree(pMsg->pEpSet);
5,974,961✔
552

553
  if (pRequest->body.queryFp != NULL) {
5,974,961✔
554
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
5,974,961✔
555

556
    if (code == TSDB_CODE_SUCCESS) {
5,974,961✔
557
      SCatalog* pCatalog = NULL;
4,885,178✔
558
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
4,885,178✔
559
      if (pRes->res != NULL) {
4,885,178✔
560
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
4,857,464✔
561
      }
562

563
      if (ret != TSDB_CODE_SUCCESS) {
4,885,178✔
UNCOV
564
        code = ret;
×
565
      }
566
    }
567

568
    doRequestCallback(pRequest, code);
5,974,961✔
569
  } else {
UNCOV
570
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
571
      tscError("failed to post semaphore");
×
572
    }
573
  }
574
  return code;
5,974,961✔
575
}
576

577
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
8,252✔
578
  int32_t      code = 0;
8,252✔
579
  int32_t      line = 0;
8,252✔
580
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
8,252✔
581
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
8,252✔
582
  pBlock->info.hasVarCol = true;
8,252✔
583

584
  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
8,252✔
585
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
8,252✔
586
  SColumnInfoData infoData = {0};
8,252✔
587
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
8,252✔
588
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
8,252✔
589
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
16,504✔
590

591
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
8,252✔
592
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
8,252✔
593
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
16,504✔
594

595
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
8,252✔
596
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
8,252✔
597
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
16,504✔
598

599
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
8,252✔
600
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD4_LEN;
8,252✔
601
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
16,504✔
602

603
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
8,252✔
604
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD5_LEN;
8,252✔
605
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
16,504✔
606

607
  int32_t numOfCfg = taosArrayGetSize(pVars);
8,252✔
608
  code = blockDataEnsureCapacity(pBlock, numOfCfg);
8,252✔
609
  TSDB_CHECK_CODE(code, line, END);
8,252✔
610

611
  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
766,770✔
612
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
758,518✔
613
    TSDB_CHECK_NULL(pInfo, code, line, END, terrno);
758,518✔
614

615
    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
758,518✔
616
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
758,518✔
617
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
758,518✔
618
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
758,518✔
619
    code = colDataSetVal(pColInfo, i, name, false);
758,518✔
620
    TSDB_CHECK_CODE(code, line, END);
758,518✔
621

622
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
758,518✔
623
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
758,518✔
624
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
758,518✔
625
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
758,518✔
626
    code = colDataSetVal(pColInfo, i, value, false);
758,518✔
627
    TSDB_CHECK_CODE(code, line, END);
758,518✔
628

629
    char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
758,518✔
630
    STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
758,518✔
631
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
758,518✔
632
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
758,518✔
633
    code = colDataSetVal(pColInfo, i, scope, false);
758,518✔
634
    TSDB_CHECK_CODE(code, line, END);
758,518✔
635

636
    char category[TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE] = {0};
758,518✔
637
    STR_WITH_MAXSIZE_TO_VARSTR(category, pInfo->category, TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE);
758,518✔
638
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
758,518✔
639
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
758,518✔
640
    code = colDataSetVal(pColInfo, i, category, false);
758,518✔
641
    TSDB_CHECK_CODE(code, line, END);
758,518✔
642

643
    char info[TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE] = {0};
758,518✔
644
    STR_WITH_MAXSIZE_TO_VARSTR(info, pInfo->info, TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE);
758,518✔
645
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
758,518✔
646
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
758,518✔
647
    code = colDataSetVal(pColInfo, i, info, false);
758,518✔
648
    TSDB_CHECK_CODE(code, line, END);
758,518✔
649
  }
650

651
  pBlock->info.rows = numOfCfg;
8,252✔
652

653
  *block = pBlock;
8,252✔
654
  return code;
8,252✔
655

UNCOV
656
END:
×
UNCOV
657
  taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
658
  taosMemoryFree(pBlock);
×
UNCOV
659
  return code;
×
660
}
661

662
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
8,252✔
663
  SSDataBlock* pBlock = NULL;
8,252✔
664
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
8,252✔
665
  if (code) {
8,252✔
UNCOV
666
    return code;
×
667
  }
668

669
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
8,252✔
670
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
8,252✔
671
  *pRsp = taosMemoryCalloc(1, rspSize);
8,252✔
672
  if (NULL == *pRsp) {
8,252✔
UNCOV
673
    code = terrno;
×
674
    goto _exit;
×
675
  }
676

677
  (*pRsp)->useconds = 0;
8,252✔
678
  (*pRsp)->completed = 1;
8,252✔
679
  (*pRsp)->precision = 0;
8,252✔
680
  (*pRsp)->compressed = 0;
8,252✔
681

682
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
8,252✔
683
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
8,252✔
684

685
  int32_t len = 0;
8,252✔
686
  if ((*pRsp)->numOfRows > 0) {
8,252✔
687
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
7,788✔
688
    if (len < 0) {
7,788✔
UNCOV
689
      uError("buildShowVariablesRsp error, len:%d", len);
×
UNCOV
690
      code = terrno;
×
691
      goto _exit;
×
692
    }
693
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
7,788✔
694

695
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
7,788✔
696
    (*pRsp)->payloadLen = htonl(payloadLen);
7,788✔
697
    (*pRsp)->compLen = htonl(payloadLen);
7,788✔
698

699
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
7,788✔
UNCOV
700
      uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
701
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
702
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
703
      goto _exit;
×
704
    }
705
  }
706

707
  blockDataDestroy(pBlock);
8,252✔
708
  pBlock = NULL;
8,252✔
709

710
  return TSDB_CODE_SUCCESS;
8,252✔
UNCOV
711
_exit:
×
UNCOV
712
  if (*pRsp) {
×
UNCOV
713
    taosMemoryFree(*pRsp);
×
UNCOV
714
    *pRsp = NULL;
×
715
  }
UNCOV
716
  if (pBlock) {
×
UNCOV
717
    blockDataDestroy(pBlock);
×
718
    pBlock = NULL;
×
719
  }
720
  return code;
×
721
}
722

723
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
8,252✔
724
  SRequestObj* pRequest = param;
8,252✔
725
  if (code != TSDB_CODE_SUCCESS) {
8,252✔
UNCOV
726
    setErrno(pRequest, code);
×
727
  } else {
728
    SShowVariablesRsp  rsp = {0};
8,252✔
729
    SRetrieveTableRsp* pRes = NULL;
8,252✔
730
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
8,252✔
731
    if (TSDB_CODE_SUCCESS == code) {
8,252✔
732
      code = buildShowVariablesRsp(rsp.variables, &pRes);
8,252✔
733
    }
734
    if (TSDB_CODE_SUCCESS == code) {
8,252✔
735
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
8,252✔
736
    }
737

738
    if (code != 0) {
8,252✔
UNCOV
739
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
740
      taosMemoryFree(pRes);
×
741
    }
742
    tFreeSShowVariablesRsp(&rsp);
8,252✔
743
  }
744

745
  taosMemoryFree(pMsg->pData);
8,252✔
746
  taosMemoryFree(pMsg->pEpSet);
8,252✔
747

748
  if (pRequest->body.queryFp != NULL) {
8,252✔
749
    doRequestCallback(pRequest, code);
8,252✔
750
  } else {
UNCOV
751
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
752
      tscError("failed to post semaphore");
×
753
    }
754
  }
755
  return code;
8,252✔
756
}
757

758
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
49,740✔
759
  int32_t      code = 0;
49,740✔
760
  int32_t      line = 0;
49,740✔
761
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
49,740✔
762
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
49,740✔
763
  pBlock->info.hasVarCol = true;
49,740✔
764

765
  pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
49,740✔
766
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
49,740✔
767
  SColumnInfoData infoData = {0};
49,740✔
768
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
49,740✔
769
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
49,740✔
770
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
99,480✔
771

772
  infoData.info.type = TSDB_DATA_TYPE_INT;
49,740✔
773
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
49,740✔
774
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
99,480✔
775

776
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
49,740✔
777
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
49,740✔
778
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
99,480✔
779

780
  code = blockDataEnsureCapacity(pBlock, 1);
49,740✔
781
  TSDB_CHECK_CODE(code, line, END);
49,740✔
782

783
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
49,740✔
784
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
49,740✔
785
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
49,740✔
786
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
49,740✔
787
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
49,740✔
788
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
49,740✔
789

790
  char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
49,740✔
791
  char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
49,740✔
792
  if (pRsp->bAccepted) {
49,740✔
793
    STR_TO_VARSTR(result, "accepted");
49,740✔
794
    code = colDataSetVal(pResultCol, 0, result, false);
49,740✔
795
    TSDB_CHECK_CODE(code, line, END);
49,740✔
796
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
49,740✔
797
    TSDB_CHECK_CODE(code, line, END);
49,740✔
798
    STR_TO_VARSTR(reason, "success");
49,740✔
799
    code = colDataSetVal(pReasonCol, 0, reason, false);
49,740✔
800
    TSDB_CHECK_CODE(code, line, END);
49,740✔
801
  } else {
UNCOV
802
    STR_TO_VARSTR(result, "rejected");
×
UNCOV
803
    code = colDataSetVal(pResultCol, 0, result, false);
×
UNCOV
804
    TSDB_CHECK_CODE(code, line, END);
×
805
    colDataSetNULL(pIdCol, 0);
UNCOV
806
    STR_TO_VARSTR(reason, "compaction is ongoing");
×
UNCOV
807
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
UNCOV
808
    TSDB_CHECK_CODE(code, line, END);
×
809
  }
810
  pBlock->info.rows = 1;
49,740✔
811

812
  *block = pBlock;
49,740✔
813

814
  return TSDB_CODE_SUCCESS;
49,740✔
UNCOV
815
END:
×
UNCOV
816
  taosMemoryFree(pBlock);
×
UNCOV
817
  taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
818
  return code;
×
819
}
820

821
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
49,740✔
822
  SSDataBlock* pBlock = NULL;
49,740✔
823
  int32_t      code = buildCompactDbBlock(pCompactDb, &pBlock);
49,740✔
824
  if (code) {
49,740✔
825
    return code;
×
826
  }
827

828
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
49,740✔
829
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
49,740✔
830
  *pRsp = taosMemoryCalloc(1, rspSize);
49,740✔
831
  if (NULL == *pRsp) {
49,740✔
UNCOV
832
    code = terrno;
×
833
    goto _exit;
×
834
  }
835

836
  (*pRsp)->useconds = 0;
49,740✔
837
  (*pRsp)->completed = 1;
49,740✔
838
  (*pRsp)->precision = 0;
49,740✔
839
  (*pRsp)->compressed = 0;
49,740✔
840
  (*pRsp)->compLen = 0;
49,740✔
841
  (*pRsp)->payloadLen = 0;
49,740✔
842
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
49,740✔
843
  (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
49,740✔
844

845
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
49,740✔
846
  if (len < 0) {
49,740✔
UNCOV
847
    uError("buildRetriveTableRspForCompactDb error, len:%d", len);
×
UNCOV
848
    code = terrno;
×
UNCOV
849
    goto _exit;
×
850
  }
851
  blockDataDestroy(pBlock);
49,740✔
852

853
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
49,740✔
854

855
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
49,740✔
856
  (*pRsp)->payloadLen = htonl(payloadLen);
49,740✔
857
  (*pRsp)->compLen = htonl(payloadLen);
49,740✔
858

859
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
49,740✔
UNCOV
860
    uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
861
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
862
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
863
    goto _exit;
×
864
  }
865

866
  return TSDB_CODE_SUCCESS;
49,740✔
867
_exit:
×
UNCOV
868
  if (*pRsp) {
×
UNCOV
869
    taosMemoryFree(*pRsp);
×
UNCOV
870
    *pRsp = NULL;
×
871
  }
UNCOV
872
  if (pBlock) {
×
UNCOV
873
    blockDataDestroy(pBlock);
×
UNCOV
874
    pBlock = NULL;
×
875
  }
UNCOV
876
  return code;
×
877
}
878

879
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
34,983✔
880
  SRequestObj* pRequest = param;
34,983✔
881
  if (code != TSDB_CODE_SUCCESS) {
34,983✔
882
    setErrno(pRequest, code);
2,074✔
883
  } else {
884
    SCompactDbRsp      rsp = {0};
32,909✔
885
    SRetrieveTableRsp* pRes = NULL;
32,909✔
886
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
32,909✔
887
    if (TSDB_CODE_SUCCESS == code) {
32,909✔
888
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
32,909✔
889
    }
890
    if (TSDB_CODE_SUCCESS == code) {
32,909✔
891
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
32,909✔
892
    }
893

894
    if (code != 0) {
32,909✔
UNCOV
895
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
896
      taosMemoryFree(pRes);
×
897
    }
898
  }
899

900
  taosMemoryFree(pMsg->pData);
34,983✔
901
  taosMemoryFree(pMsg->pEpSet);
34,983✔
902

903
  if (pRequest->body.queryFp != NULL) {
34,983✔
904
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
34,983✔
905
  } else {
UNCOV
906
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
907
      tscError("failed to post semaphore");
×
908
    }
909
  }
910
  return code;
34,983✔
911
}
912

913
static int32_t buildScanDbBlock(SScanDbRsp* pRsp, SSDataBlock** block) {
356✔
914
  int32_t      code = 0;
356✔
915
  int32_t      line = 0;
356✔
916
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
356✔
917
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
356✔
918
  pBlock->info.hasVarCol = true;
356✔
919

920
  pBlock->pDataBlock = taosArrayInit(SCAN_DB_RESULT_COLS, sizeof(SColumnInfoData));
356✔
921
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
356✔
922
  SColumnInfoData infoData = {0};
356✔
923
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
356✔
924
  infoData.info.bytes = SCAN_DB_RESULT_FIELD1_LEN;
356✔
925
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
712✔
926

927
  infoData.info.type = TSDB_DATA_TYPE_INT;
356✔
928
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
356✔
929
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
712✔
930

931
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
356✔
932
  infoData.info.bytes = SCAN_DB_RESULT_FIELD3_LEN;
356✔
933
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
712✔
934

935
  code = blockDataEnsureCapacity(pBlock, 1);
356✔
936
  TSDB_CHECK_CODE(code, line, END);
356✔
937

938
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
356✔
939
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
356✔
940
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
356✔
941
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
356✔
942
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
356✔
943
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
356✔
944

945
  char result[SCAN_DB_RESULT_FIELD1_LEN] = {0};
356✔
946
  char reason[SCAN_DB_RESULT_FIELD3_LEN] = {0};
356✔
947
  if (pRsp->bAccepted) {
356✔
948
    STR_TO_VARSTR(result, "accepted");
356✔
949
    code = colDataSetVal(pResultCol, 0, result, false);
356✔
950
    TSDB_CHECK_CODE(code, line, END);
356✔
951
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->scanId, false);
356✔
952
    TSDB_CHECK_CODE(code, line, END);
356✔
953
    STR_TO_VARSTR(reason, "success");
356✔
954
    code = colDataSetVal(pReasonCol, 0, reason, false);
356✔
955
    TSDB_CHECK_CODE(code, line, END);
356✔
956
  } else {
UNCOV
957
    STR_TO_VARSTR(result, "rejected");
×
UNCOV
958
    code = colDataSetVal(pResultCol, 0, result, false);
×
UNCOV
959
    TSDB_CHECK_CODE(code, line, END);
×
960
    colDataSetNULL(pIdCol, 0);
UNCOV
961
    STR_TO_VARSTR(reason, "scan is ongoing");
×
UNCOV
962
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
UNCOV
963
    TSDB_CHECK_CODE(code, line, END);
×
964
  }
965
  pBlock->info.rows = 1;
356✔
966

967
  *block = pBlock;
356✔
968

969
  return TSDB_CODE_SUCCESS;
356✔
UNCOV
970
END:
×
UNCOV
971
  taosMemoryFree(pBlock);
×
UNCOV
972
  taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
973
  return code;
×
974
}
975

976
static int32_t buildRetriveTableRspForScanDb(SScanDbRsp* pScanDb, SRetrieveTableRsp** pRsp) {
356✔
977
  SSDataBlock* pBlock = NULL;
356✔
978
  int32_t      code = buildScanDbBlock(pScanDb, &pBlock);
356✔
979
  if (code) {
356✔
980
    return code;
×
981
  }
982

983
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
356✔
984
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
356✔
985
  *pRsp = taosMemoryCalloc(1, rspSize);
356✔
986
  if (NULL == *pRsp) {
356✔
UNCOV
987
    code = terrno;
×
988
    goto _exit;
×
989
  }
990

991
  (*pRsp)->useconds = 0;
356✔
992
  (*pRsp)->completed = 1;
356✔
993
  (*pRsp)->precision = 0;
356✔
994
  (*pRsp)->compressed = 0;
356✔
995
  (*pRsp)->compLen = 0;
356✔
996
  (*pRsp)->payloadLen = 0;
356✔
997
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
356✔
998
  (*pRsp)->numOfCols = htonl(SCAN_DB_RESULT_COLS);
356✔
999

1000
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SCAN_DB_RESULT_COLS);
356✔
1001
  if (len < 0) {
356✔
UNCOV
1002
    uError("%s error, len:%d", __func__, len);
×
UNCOV
1003
    code = terrno;
×
UNCOV
1004
    goto _exit;
×
1005
  }
1006
  blockDataDestroy(pBlock);
356✔
1007

1008
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
356✔
1009

1010
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
356✔
1011
  (*pRsp)->payloadLen = htonl(payloadLen);
356✔
1012
  (*pRsp)->compLen = htonl(payloadLen);
356✔
1013

1014
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
356✔
UNCOV
1015
    uError("%s error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, __func__, len,
×
1016
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
1017
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
1018
    goto _exit;
×
1019
  }
1020

1021
  return TSDB_CODE_SUCCESS;
356✔
1022
_exit:
×
UNCOV
1023
  if (*pRsp) {
×
UNCOV
1024
    taosMemoryFree(*pRsp);
×
UNCOV
1025
    *pRsp = NULL;
×
1026
  }
UNCOV
1027
  if (pBlock) {
×
UNCOV
1028
    blockDataDestroy(pBlock);
×
UNCOV
1029
    pBlock = NULL;
×
1030
  }
UNCOV
1031
  return code;
×
1032
}
1033

1034
static int32_t processScanDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
762✔
1035
  SRequestObj* pRequest = param;
762✔
1036
  if (code != TSDB_CODE_SUCCESS) {
762✔
1037
    setErrno(pRequest, code);
406✔
1038
  } else {
1039
    SScanDbRsp         rsp = {0};
356✔
1040
    SRetrieveTableRsp* pRes = NULL;
356✔
1041
    code = tDeserializeSScanDbRsp(pMsg->pData, pMsg->len, &rsp);
356✔
1042
    if (TSDB_CODE_SUCCESS == code) {
356✔
1043
      code = buildRetriveTableRspForScanDb(&rsp, &pRes);
356✔
1044
    }
1045
    if (TSDB_CODE_SUCCESS == code) {
356✔
1046
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
356✔
1047
    }
1048

1049
    if (code != 0) {
356✔
UNCOV
1050
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
1051
      taosMemoryFree(pRes);
×
1052
    }
1053
  }
1054

1055
  taosMemoryFree(pMsg->pData);
762✔
1056
  taosMemoryFree(pMsg->pEpSet);
762✔
1057

1058
  if (pRequest->body.queryFp != NULL) {
762✔
1059
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
762✔
1060
  } else {
UNCOV
1061
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
1062
      tscError("failed to post semaphore");
×
1063
    }
1064
  }
1065
  return code;
762✔
1066
}
1067

1068
int32_t processTrimDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
23,932✔
1069
  SRequestObj* pRequest = param;
23,932✔
1070
  if (code != TSDB_CODE_SUCCESS) {
23,932✔
1071
    setErrno(pRequest, code);
7,101✔
1072
  } else {
1073
    STrimDbRsp         rsp = {0};
16,831✔
1074
    SRetrieveTableRsp* pRes = NULL;
16,831✔
1075
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, (SCompactDbRsp*)&rsp);
16,831✔
1076
    if (TSDB_CODE_SUCCESS == code) {
16,831✔
1077
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
16,831✔
1078
    }
1079
    if (TSDB_CODE_SUCCESS == code) {
16,831✔
1080
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
16,831✔
1081
    }
1082

1083
    if (code != 0) {
16,831✔
UNCOV
1084
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
1085
      taosMemoryFree(pRes);
×
1086
    }
1087
  }
1088

1089
  taosMemoryFree(pMsg->pData);
23,932✔
1090
  taosMemoryFree(pMsg->pEpSet);
23,932✔
1091

1092
  if (pRequest->body.queryFp != NULL) {
23,932✔
1093
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
23,932✔
1094
  } else {
UNCOV
1095
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
1096
      tscError("failed to post semaphore");
×
1097
    }
1098
  }
1099
  return code;
23,932✔
1100
}
1101

1102
static int32_t buildCreateTokenBlock(SCreateTokenRsp* pRsp, SSDataBlock** block) {
19,303✔
1103
  int32_t      code = 0;
19,303✔
1104
  int32_t      line = 0;
19,303✔
1105
  SSDataBlock* pBlock = taosMemoryCalloc(CREATE_TOKEN_RESULT_COLS, sizeof(SSDataBlock));
19,303✔
1106
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
19,303✔
1107
  pBlock->info.hasVarCol = true;
19,303✔
1108

1109
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
19,303✔
1110
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
19,303✔
1111
  SColumnInfoData infoData = {0};
19,303✔
1112
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
19,303✔
1113
  infoData.info.bytes = CREATE_TOKEN_RESULT_FIELD1_LEN;
19,303✔
1114
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
38,606✔
1115

1116
  // Handle empty result case (when pRsp is NULL)
1117
  if (pRsp == NULL) {
19,303✔
1118
    pBlock->info.rows = 0;
178✔
1119
    *block = pBlock;
178✔
1120
    return TSDB_CODE_SUCCESS;
178✔
1121
  }
1122

1123
  code = blockDataEnsureCapacity(pBlock, 1);
19,125✔
1124
  TSDB_CHECK_CODE(code, line, END);
19,125✔
1125

1126
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
19,125✔
1127
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
19,125✔
1128

1129
  char result[sizeof(pRsp->token) + 64] = {0};
19,125✔
1130
  STR_TO_VARSTR(result, pRsp->token);
19,125✔
1131
  code = colDataSetVal(pResultCol, 0, result, false);
19,125✔
1132
  TSDB_CHECK_CODE(code, line, END);
19,125✔
1133

1134
  pBlock->info.rows = 1;
19,125✔
1135

1136
  *block = pBlock;
19,125✔
1137
  return TSDB_CODE_SUCCESS;
19,125✔
1138

UNCOV
1139
END:
×
UNCOV
1140
  if (pBlock) {
×
UNCOV
1141
    taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
1142
    taosMemoryFree(pBlock);
×
1143
  }
UNCOV
1144
  return code;
×
1145
}
1146

1147
static int32_t buildTableRspForCreateToken(SCreateTokenRsp* pResp, SRetrieveTableRsp** pRsp) {
19,303✔
1148
  SSDataBlock* pBlock = NULL;
19,303✔
1149
  int32_t      code = buildCreateTokenBlock(pResp, &pBlock);
19,303✔
1150
  if (code) {
19,303✔
UNCOV
1151
    return code;
×
1152
  }
1153

1154
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
19,303✔
1155
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
19,303✔
1156
  *pRsp = taosMemoryCalloc(1, rspSize);
19,303✔
1157
  if (NULL == *pRsp) {
19,303✔
1158
    code = terrno;
×
1159
    goto _exit;
×
1160
  }
1161

1162
  (*pRsp)->useconds = 0;
19,303✔
1163
  (*pRsp)->completed = 1;
19,303✔
1164
  (*pRsp)->precision = 0;
19,303✔
1165
  (*pRsp)->compressed = 0;
19,303✔
1166

1167
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
19,303✔
1168
  (*pRsp)->numOfCols = htonl(CREATE_TOKEN_RESULT_COLS);
19,303✔
1169

1170
  int32_t len = 0;
19,303✔
1171
  if (pBlock->info.rows > 0) {
19,303✔
1172
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, CREATE_TOKEN_RESULT_COLS);
19,125✔
1173
    if (len < 0) {
19,125✔
UNCOV
1174
      uError("buildTableRspFroCreateToken error, len:%d", len);
×
UNCOV
1175
      code = terrno;
×
1176
      goto _exit;
×
1177
    }
1178

1179
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
19,125✔
1180

1181
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
19,125✔
1182
    (*pRsp)->payloadLen = htonl(payloadLen);
19,125✔
1183
    (*pRsp)->compLen = htonl(payloadLen);
19,125✔
1184

1185
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
19,125✔
UNCOV
1186
      uError("buildTableRspFroCreateToken error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
1187
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
1188
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
1189
      goto _exit;
×
1190
    }
1191
  } else {
1192
    // Empty result case
1193
    SET_PAYLOAD_LEN((*pRsp)->data, 0, 0);
178✔
1194
    (*pRsp)->payloadLen = htonl(PAYLOAD_PREFIX_LEN);
178✔
1195
    (*pRsp)->compLen = htonl(PAYLOAD_PREFIX_LEN);
178✔
1196
  }
1197
  blockDataDestroy(pBlock);
19,303✔
1198
  return TSDB_CODE_SUCCESS;
19,303✔
1199

UNCOV
1200
_exit:
×
UNCOV
1201
  if (*pRsp) {
×
UNCOV
1202
    taosMemoryFree(*pRsp);
×
UNCOV
1203
    *pRsp = NULL;
×
1204
  }
UNCOV
1205
  if (pBlock) {
×
1206
    blockDataDestroy(pBlock);
×
1207
    pBlock = NULL;
×
1208
  }
UNCOV
1209
  return code;
×
1210
}
1211

1212
int32_t processCreateTokenRsp(void* param, SDataBuf* pMsg, int32_t code) {
23,109✔
1213
  SRequestObj* pRequest = param;
23,109✔
1214
  if (code != TSDB_CODE_SUCCESS) {
23,109✔
1215
    setErrno(pRequest, code);
3,806✔
1216
  } else {
1217
    SCreateTokenRsp    rsp = {0};
19,303✔
1218
    SRetrieveTableRsp* pRes = NULL;
19,303✔
1219
    
1220
    // Handle empty message case
1221
    if (pMsg->len == 0) {
19,303✔
1222
      code = buildTableRspForCreateToken(NULL, &pRes);
178✔
1223
    } else {
1224
      code = tDeserializeSCreateTokenResp(pMsg->pData, pMsg->len, &rsp);
19,125✔
1225
      if (TSDB_CODE_SUCCESS == code) {
19,125✔
1226
        code = buildTableRspForCreateToken(&rsp, &pRes);
19,125✔
1227
      }
1228
    }
1229
    
1230
    if (TSDB_CODE_SUCCESS == code) {
19,303✔
1231
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
19,303✔
1232
    }
1233

1234
    if (code != 0) {
19,303✔
UNCOV
1235
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
1236
      taosMemoryFree(pRes);
×
1237
    }
1238
  }
1239

1240
  taosMemoryFree(pMsg->pData);
23,109✔
1241
  taosMemoryFree(pMsg->pEpSet);
23,109✔
1242

1243
  if (pRequest->body.queryFp != NULL) {
23,109✔
1244
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
23,109✔
UNCOV
1245
  } else if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
1246
    tscError("failed to post semaphore");
×
1247
  }
1248
  return code;
23,109✔
1249
}
1250

1251
static int32_t buildCreateTotpSecretBlock(SCreateTotpSecretRsp* pRsp, SSDataBlock** block) {
18,510✔
1252
  int32_t      code = 0;
18,510✔
1253
  int32_t      line = 0;
18,510✔
1254
  SSDataBlock* pBlock = taosMemoryCalloc(CREATE_TOTP_SECRET_RESULT_COLS, sizeof(SSDataBlock));
18,510✔
1255
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
18,510✔
1256
  pBlock->info.hasVarCol = true;
18,510✔
1257

1258
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
18,510✔
1259
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
18,510✔
1260
  SColumnInfoData infoData = {0};
18,510✔
1261
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
18,510✔
1262
  infoData.info.bytes = CREATE_TOTP_SECRET_RESULT_FIELD1_LEN;
18,510✔
1263
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
37,020✔
1264

1265
  code = blockDataEnsureCapacity(pBlock, 1);
18,510✔
1266
  TSDB_CHECK_CODE(code, line, END);
18,510✔
1267

1268
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
18,510✔
1269
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
18,510✔
1270

1271
  char result[sizeof(pRsp->totpSecret) + 64] = {0};
18,510✔
1272
  STR_TO_VARSTR(result, pRsp->totpSecret);
18,510✔
1273
  code = colDataSetVal(pResultCol, 0, result, false);
18,510✔
1274
  TSDB_CHECK_CODE(code, line, END);
18,510✔
1275

1276
  pBlock->info.rows = 1;
18,510✔
1277

1278
  *block = pBlock;
18,510✔
1279
  return TSDB_CODE_SUCCESS;
18,510✔
1280

UNCOV
1281
END:
×
UNCOV
1282
  if (pBlock) {
×
UNCOV
1283
    taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
1284
    taosMemoryFree(pBlock);
×
1285
  }
UNCOV
1286
  return code;
×
1287
}
1288

1289
static int32_t buildTableRspForCreateTotpSecret(SCreateTotpSecretRsp* pResp, SRetrieveTableRsp** pRsp) {
18,510✔
1290
  SSDataBlock* pBlock = NULL;
18,510✔
1291
  int32_t      code = buildCreateTotpSecretBlock(pResp, &pBlock);
18,510✔
1292
  if (code) {
18,510✔
UNCOV
1293
    return code;
×
1294
  }
1295

1296
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
18,510✔
1297
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
18,510✔
1298
  *pRsp = taosMemoryCalloc(1, rspSize);
18,510✔
1299
  if (NULL == *pRsp) {
18,510✔
1300
    code = terrno;
×
1301
    goto _exit;
×
1302
  }
1303

1304
  (*pRsp)->useconds = 0;
18,510✔
1305
  (*pRsp)->completed = 1;
18,510✔
1306
  (*pRsp)->precision = 0;
18,510✔
1307
  (*pRsp)->compressed = 0;
18,510✔
1308

1309
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
18,510✔
1310
  (*pRsp)->numOfCols = htonl(CREATE_TOTP_SECRET_RESULT_COLS);
18,510✔
1311

1312
  int32_t len =
1313
      blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, CREATE_TOTP_SECRET_RESULT_COLS);
18,510✔
1314
  if (len < 0) {
18,510✔
UNCOV
1315
    uError("buildTableRspFroCreateTotpSecret error, len:%d", len);
×
UNCOV
1316
    code = terrno;
×
UNCOV
1317
    goto _exit;
×
1318
  }
1319

1320
  blockDataDestroy(pBlock);
18,510✔
1321
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
18,510✔
1322

1323
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
18,510✔
1324
  (*pRsp)->payloadLen = htonl(payloadLen);
18,510✔
1325
  (*pRsp)->compLen = htonl(payloadLen);
18,510✔
1326

1327
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
18,510✔
UNCOV
1328
    uError("buildTableRspFroCreateTotpSecret error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
1329
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
1330
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
1331
    goto _exit;
×
1332
  }
1333
  return TSDB_CODE_SUCCESS;
18,510✔
1334

1335
_exit:
×
UNCOV
1336
  if (*pRsp) {
×
UNCOV
1337
    taosMemoryFree(*pRsp);
×
UNCOV
1338
    *pRsp = NULL;
×
1339
  }
UNCOV
1340
  if (pBlock) {
×
UNCOV
1341
    blockDataDestroy(pBlock);
×
UNCOV
1342
    pBlock = NULL;
×
1343
  }
UNCOV
1344
  return code;
×
1345
}
1346

1347
int32_t processCreateTotpSecretRsp(void* param, SDataBuf* pMsg, int32_t code) {
18,681✔
1348
  SRequestObj* pRequest = param;
18,681✔
1349
  if (code != TSDB_CODE_SUCCESS) {
18,681✔
1350
    setErrno(pRequest, code);
171✔
1351
  } else {
1352
    SCreateTotpSecretRsp    rsp = {0};
18,510✔
1353
    SRetrieveTableRsp* pRes = NULL;
18,510✔
1354
    code = tDeserializeSCreateTotpSecretRsp(pMsg->pData, pMsg->len, &rsp);
18,510✔
1355
    if (TSDB_CODE_SUCCESS == code) {
18,510✔
1356
      code = buildTableRspForCreateTotpSecret(&rsp, &pRes);
18,510✔
1357
    }
1358
    if (TSDB_CODE_SUCCESS == code) {
18,510✔
1359
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
18,510✔
1360
    }
1361

1362
    if (code != 0) {
18,510✔
UNCOV
1363
      pRequest->body.resInfo.pRspMsg = NULL;
×
UNCOV
1364
      taosMemoryFree(pRes);
×
1365
    }
1366
  }
1367

1368
  taosMemoryFree(pMsg->pData);
18,681✔
1369
  taosMemoryFree(pMsg->pEpSet);
18,681✔
1370

1371
  if (pRequest->body.queryFp != NULL) {
18,681✔
1372
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
18,681✔
UNCOV
1373
  } else if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
1374
    tscError("failed to post semaphore");
×
1375
  }
1376
  return code;
18,681✔
1377
}
1378

1379
int32_t processCreateXnodeTaskRsp(void* param, SDataBuf* pMsg, int32_t code) {
4,257✔
1380
  SRequestObj* pRequest = param;
4,257✔
1381
  if (code != TSDB_CODE_SUCCESS) {
4,257✔
1382
    setErrno(pRequest, code);
4,257✔
1383
    if (code == TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR) {
4,257✔
UNCOV
1384
      if (pMsg->pData != NULL && pMsg->len > 0) {
×
UNCOV
1385
        if (pMsg->len <= pRequest->msgBufLen) {
×
UNCOV
1386
          tstrncpy(pRequest->msgBuf, (char*)pMsg->pData, pRequest->msgBufLen);
×
1387
        } else {
UNCOV
1388
          taosMemoryFreeClear(pRequest->msgBuf);
×
UNCOV
1389
          pRequest->msgBuf = pMsg->pData;
×
UNCOV
1390
          pMsg->pData = NULL;
×
1391
          pRequest->msgBufLen = pMsg->len;
×
1392
        }
1393
      }
1394
    }
1395
  }
1396

1397
  if (pMsg->pData) {
4,257✔
UNCOV
1398
    taosMemoryFree(pMsg->pData);
×
1399
  }
1400
  taosMemoryFree(pMsg->pEpSet);
4,257✔
1401

1402
  if (pRequest->body.queryFp != NULL) {
4,257✔
1403
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
4,257✔
1404
  } else if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
1405
    tscError("failed to post semaphore");
×
1406
  }
1407
  return code;
4,257✔
1408
}
1409

1410

1411
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
204,374,737✔
1412
  switch (msgType) {
204,374,737✔
1413
    case TDMT_MND_CONNECT:
94,466,564✔
1414
      return processConnectRsp;
94,466,564✔
1415
    case TDMT_MND_CREATE_DB:
1,590,888✔
1416
      return processCreateDbRsp;
1,590,888✔
1417
    case TDMT_MND_USE_DB:
94,121,244✔
1418
      return processUseDbRsp;
94,121,244✔
1419
    case TDMT_MND_CREATE_STB:
2,127,989✔
1420
      return processCreateSTableRsp;
2,127,989✔
1421
    case TDMT_MND_DROP_DB:
1,405,439✔
1422
      return processDropDbRsp;
1,405,439✔
1423
    case TDMT_MND_ALTER_STB:
5,974,961✔
1424
      return processAlterStbRsp;
5,974,961✔
1425
    case TDMT_MND_SHOW_VARIABLES:
8,252✔
1426
      return processShowVariablesRsp;
8,252✔
1427
    case TDMT_MND_COMPACT_DB:
34,983✔
1428
      return processCompactDbRsp;
34,983✔
1429
    case TDMT_MND_TRIM_DB:
23,932✔
1430
      return processTrimDbRsp;
23,932✔
1431
    case TDMT_MND_SCAN_DB:
762✔
1432
      return processScanDbRsp;
762✔
1433
    case TDMT_MND_CREATE_TOKEN:
23,109✔
1434
      return processCreateTokenRsp;
23,109✔
1435
    case TDMT_MND_CREATE_TOTP_SECRET:
18,681✔
1436
      return processCreateTotpSecretRsp;
18,681✔
1437
    case TDMT_MND_CREATE_XNODE_TASK:
4,257✔
1438
      return processCreateXnodeTaskRsp;
4,257✔
1439

1440
    default:
4,573,676✔
1441
      return genericRspCallback;
4,573,676✔
1442
  }
1443
}
1444

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc