• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5017

09 Apr 2026 02:37PM UTC coverage: 72.248% (-0.05%) from 72.299%
#5017

push

travis-ci

web-flow
merge: from main to 3.0 branch #35095 

merge: from main to 3.0 branch

499 of 655 new or added lines in 34 files covered. (76.18%)

821 existing lines in 156 files now uncovered.

257359 of 356215 relevant lines covered (72.25%)

132044878.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.61
/source/client/src/clientMsgHandler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "clientSession.h"
21
#include "cmdnodes.h"
22
#include "command.h"
23
#include "os.h"
24
#include "query.h"
25
#include "systable.h"
26
#include "tdatablock.h"
27
#include "tdef.h"
28
#include "tglobal.h"
29
#include "tmsg.h"
30
#include "tname.h"
31
#include "tversion.h"
32

33
extern SClientHbMgr clientHbMgr;
34

35
static void setErrno(SRequestObj* pRequest, int32_t code) {
5,600,893✔
36
  pRequest->code = code;
5,600,893✔
37
  terrno = code;
5,600,893✔
38
}
5,600,893✔
39

40
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
4,463,929✔
41
  SRequestObj* pRequest = param;
4,463,929✔
42
  setErrno(pRequest, code);
4,463,929✔
43

44
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
4,463,929✔
45
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
1,292,579✔
46
      tscError("failed to remove meta data for table");
×
47
    }
48
  }
49

50
  taosMemoryFree(pMsg->pEpSet);
4,463,929✔
51
  taosMemoryFree(pMsg->pData);
4,463,929✔
52
  if (pRequest->body.queryFp != NULL) {
4,463,929✔
53
    doRequestCallback(pRequest, code);
4,463,929✔
54
  } else {
55
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
56
      tscError("failed to post semaphore");
×
57
    }
58
  }
59
  return code;
4,463,929✔
60
}
61

62
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
97,714,201✔
63
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
97,714,201✔
64
  if (NULL == pRequest) {
97,720,756✔
65
    goto EXIT;
×
66
  }
67

68
  if (code != TSDB_CODE_SUCCESS) {
97,720,756✔
69
    goto End;
15,166✔
70
  }
71

72
  STscObj* pTscObj = pRequest->pTscObj;
97,705,590✔
73

74
  if (NULL == pTscObj->pAppInfo) {
97,705,590✔
75
    code = TSDB_CODE_TSC_DISCONNECTED;
×
76
    goto End;
×
77
  }
78

79
  if (pTscObj->connType == CONN_TYPE__AUTH_TEST) {
97,705,590✔
80
    // auth test connection, no need to process connect rsp
UNCOV
81
    goto End;
×
82
  }
83

84
  SConnectRsp connectRsp = {0};
97,705,590✔
85
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
97,705,590✔
86
    code = TSDB_CODE_TSC_INVALID_VERSION;
×
87
    goto End;
×
88
  }
89

90
  if ((code = taosCheckVersionCompatibleFromStr(td_version, connectRsp.sVer, 3)) != 0) {
97,697,614✔
91
    tscError("version not compatible. client version:%s, server version:%s", td_version, connectRsp.sVer);
×
92
    goto End;
×
93
  }
94

95
  int32_t now = taosGetTimestampSec();
97,680,427✔
96
  int32_t delta = abs(now - connectRsp.svrTimestamp);
97,674,462✔
97
  if (delta > tsTimestampDeltaLimit) {
97,674,462✔
98
    code = TSDB_CODE_TIME_UNSYNCED;
5,641✔
99
    tscError("time diff:%ds is too big", delta);
5,641✔
100
    goto End;
×
101
  }
102

103
  if (connectRsp.epSet.numOfEps == 0) {
97,668,821✔
104
    code = TSDB_CODE_APP_ERROR;
×
105
    goto End;
×
106
  }
107

108
  int    updateEpSet = 1;
97,668,821✔
109
  SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
97,668,821✔
110
  if (connectRsp.dnodeNum == 1) {
97,665,585✔
111
    SEpSet dstEpSet = connectRsp.epSet;
97,404,226✔
112
    if (srcEpSet.numOfEps == 1) {
97,404,226✔
113
      if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
96,245,680✔
114
                            dstEpSet.eps[dstEpSet.inUse].fqdn) != 0) {
96,263,559✔
115
        tscError("failed to set default addr for rpc");
×
116
      }
117
      updateEpSet = 0;
96,237,852✔
118
    }
119
  }
120
  if (updateEpSet == 1 && !isEpsetEqual(&srcEpSet, &connectRsp.epSet)) {
97,639,912✔
121
    SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,229,808✔
122

123
    SEpSet* pOrig = &corEpSet;
1,167,931✔
124
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
1,167,931✔
125
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
1,167,931✔
126
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
1,167,931✔
127
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
128
             pNewEp->port);
129
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
1,167,931✔
130
  }
131

132
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
195,214,633✔
133
    tscDebug("QID:0x%" PRIx64 ", epSet.fqdn[%d]:%s port:%d, conn:0x%" PRIx64, pRequest->requestId, i,
97,601,690✔
134
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
135
  }
136

137
  pTscObj->sysInfo = connectRsp.sysInfo;
97,612,943✔
138
  pTscObj->connId = connectRsp.connId;
97,613,738✔
139
  pTscObj->acctId = connectRsp.acctId;
97,613,742✔
140
  if (pTscObj->user[0] == 0) {
97,613,525✔
141
    tstrncpy(pTscObj->user, connectRsp.user, tListLen(pTscObj->user));
1,798✔
142
    tstrncpy(pTscObj->tokenName, connectRsp.tokenName, tListLen(pTscObj->tokenName));
1,798✔
143
  } else {
144
    pTscObj->tokenName[0] = 0;
97,610,722✔
145
  }
146
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
97,613,655✔
147
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
97,612,988✔
148

149
  // update the appInstInfo
150
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
97,612,981✔
151
  pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
97,613,776✔
152
  pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
97,613,580✔
153
  pTscObj->pAppInfo->serverCfg.enableAuditSelect = connectRsp.enableAuditSelect;
97,612,497✔
154
  pTscObj->pAppInfo->serverCfg.enableAuditInsert = connectRsp.enableAuditInsert;
97,612,720✔
155
  pTscObj->pAppInfo->serverCfg.auditLevel = connectRsp.auditLevel;
97,612,409✔
156
  tscDebug("monitor paras from connect rsp, clusterId:0x%" PRIx64 ", threshold:%d scope:%d",
97,612,169✔
157
           connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
158
  lastClusterId = connectRsp.clusterId;
97,612,169✔
159

160
  pTscObj->connType = connectRsp.connType;
97,612,169✔
161
  pTscObj->passInfo.ver = connectRsp.passVer;
97,612,621✔
162
  pTscObj->authVer = connectRsp.authVer;
97,612,133✔
163
  pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
97,613,035✔
164
  pTscObj->userId = connectRsp.userId;
97,613,068✔
165

166
  if (taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL) {
97,612,134✔
167
    if (taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo,
1,486,626✔
168
                    POINTER_BYTES) != 0) {
169
      tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
×
170
    } else {
171
#ifdef USE_MONITOR
172
      MonitorSlowLogData data = {0};
1,486,626✔
173
      data.clusterId = pTscObj->pAppInfo->clusterId;
1,486,626✔
174
      data.type = SLOW_LOG_READ_ALL;
1,486,626✔
175
      (void)monitorPutData2MonitorQueue(data);  // ignore return code
1,486,626✔
176
      monitorClientSlowQueryInit(connectRsp.clusterId);
1,486,626✔
177
      monitorClientSQLReqInit(connectRsp.clusterId);
1,486,626✔
178
#endif
179
    }
180
  }
181

182
  code = tscRefSessMetric(pTscObj);
97,651,458✔
183
  if (TSDB_CODE_SUCCESS != code) {
97,705,150✔
184
    tscError("failed to connect with user:%s, code:%s", pTscObj->user, tstrerror(code));
×
185
    goto End;
×
186
  }
187

188
  sessMetricRef(pTscObj->pSessMetric);
97,705,150✔
189

190
  SSessParam pPara = {.type = SESSION_PER_USER, .value = 1};
97,705,666✔
191
  code = tscUpdateSessMetric(pTscObj, &pPara);
97,705,666✔
192
  if (TSDB_CODE_SUCCESS != code) {
97,705,144✔
193
    tscError("failed to connect with user:%s, code:%s", pTscObj->user, tstrerror(code));
×
194
    goto End;
×
195
  }
196

197
  (void)taosThreadMutexLock(&clientHbMgr.lock);
97,705,144✔
198
  SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
97,706,032✔
199
  if (pAppHbMgr) {
97,706,032✔
200
    if (hbRegisterConn(pAppHbMgr, pTscObj->id, pTscObj->user, pTscObj->tokenName, connectRsp.clusterId, connectRsp.connType) != 0) {
97,706,032✔
201
      tscError("QID:0x%" PRIx64 ", failed to register conn to hbMgr", pRequest->requestId);
×
202
    }
203
  } else {
204
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
205
    code = TSDB_CODE_TSC_DISCONNECTED;
×
206
    goto End;
×
207
  }
208
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
97,706,032✔
209

210
  tscDebug("QID:0x%" PRIx64 ", clusterId:0x%" PRIx64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
97,705,938✔
211
           pTscObj->pAppInfo->numOfConns);
212

213
End:
97,699,445✔
214
  if (code != 0) {
97,721,104✔
215
    setErrno(pRequest, code);
15,166✔
216
  }
217
  if (tsem_post(&pRequest->body.rspSem) != 0) {
97,721,104✔
218
    tscError("failed to post semaphore");
×
219
  }
220

221
  if (pRequest) {
97,719,579✔
222
    (void)releaseRequest(pRequest->self);
97,719,579✔
223
  }
224

225
EXIT:
2,851,396✔
226
  taosMemoryFree(param);
97,720,918✔
227
  taosMemoryFree(pMsg->pEpSet);
97,720,061✔
228
  taosMemoryFree(pMsg->pData);
97,719,196✔
229
  return code;
97,720,518✔
230
}
231

232
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
112,846,739✔
233
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
112,846,739✔
234
  if (pMsgSendInfo == NULL) return pMsgSendInfo;
112,850,989✔
235
  pMsgSendInfo->requestObjRefId = pRequest->self;
112,850,989✔
236
  pMsgSendInfo->requestId = pRequest->requestId;
112,850,989✔
237
  pMsgSendInfo->param = pRequest;
112,850,940✔
238
  pMsgSendInfo->msgType = pRequest->type;
112,850,989✔
239
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
112,850,989✔
240

241
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
112,850,989✔
242
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
112,850,940✔
243
  return pMsgSendInfo;
112,851,191✔
244
}
245

246
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,538,757✔
247
  // todo rsp with the vnode id list
248
  SRequestObj* pRequest = param;
1,538,757✔
249
  taosMemoryFree(pMsg->pData);
1,538,757✔
250
  taosMemoryFree(pMsg->pEpSet);
1,538,757✔
251
  if (code != TSDB_CODE_SUCCESS) {
1,538,757✔
252
    setErrno(pRequest, code);
32,119✔
253
  } else {
254
    struct SCatalog* pCatalog = NULL;
1,506,638✔
255
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,506,638✔
256
    if (TSDB_CODE_SUCCESS == code) {
1,506,638✔
257
      STscObj* pTscObj = pRequest->pTscObj;
1,506,638✔
258

259
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,506,638✔
260
                               .requestId = pRequest->requestId,
1,506,638✔
261
                               .requestObjRefId = pRequest->self,
1,506,638✔
262
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,506,638✔
263
      char             dbFName[TSDB_DB_FNAME_LEN];
1,472,744✔
264
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,506,638✔
265
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,506,638✔
266
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
267
      }
268
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,506,638✔
269
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,506,638✔
270
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
271
      }
272
    }
273
  }
274

275
  if (pRequest->body.queryFp) {
1,538,757✔
276
    doRequestCallback(pRequest, code);
1,538,757✔
277
  } else {
278
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
279
      tscError("failed to post semaphore");
×
280
    }
281
  }
282
  return code;
1,538,757✔
283
}
284

285
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
97,421,832✔
286
  SRequestObj* pRequest = param;
97,421,832✔
287
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
97,421,832✔
288
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
289
    SUseDbRsp usedbRsp = {0};
1,474✔
290
    if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
1,782✔
291
      tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
1,782✔
292
    }
293
    struct SCatalog* pCatalog = NULL;
1,782✔
294

295
    if (usedbRsp.vgVersion >= 0) {  // cached in local
1,782✔
296
      int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
1,782✔
297
      int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
1,782✔
298
      if (code1 != TSDB_CODE_SUCCESS) {
1,782✔
299
        tscWarn("QID:0x%" PRIx64 ", catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
×
300
                tstrerror(code1));
301
      } else {
302
        if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0) {
1,782✔
303
          tscError("QID:0x%" PRIx64 ", catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
×
304
                   usedbRsp.uid);
305
        }
306
      }
307
    }
308
    tFreeSUsedbRsp(&usedbRsp);
1,782✔
309
  }
310

311
  if (code != TSDB_CODE_SUCCESS) {
97,426,917✔
312
    taosMemoryFree(pMsg->pData);
1,931✔
313
    taosMemoryFree(pMsg->pEpSet);
1,931✔
314
    setErrno(pRequest, code);
1,931✔
315

316
    if (pRequest->body.queryFp != NULL) {
1,931✔
317
      doRequestCallback(pRequest, pRequest->code);
1,931✔
318

319
    } else {
320
      if (tsem_post(&pRequest->body.rspSem) != 0) {
×
321
        tscError("failed to post semaphore");
×
322
      }
323
    }
324

325
    return code;
1,931✔
326
  }
327

328
  SUseDbRsp usedbRsp = {0};
97,424,986✔
329
  if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
97,424,986✔
330
    tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
×
331
  }
332

333
  if (strlen(usedbRsp.db) == 0) {
97,416,940✔
334
    taosMemoryFree(pMsg->pData);
×
335
    taosMemoryFree(pMsg->pEpSet);
×
336

337
    if (usedbRsp.errCode != 0) {
×
338
      return usedbRsp.errCode;
×
339
    } else {
340
      return TSDB_CODE_APP_ERROR;
×
341
    }
342
  }
343

344
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
97,417,066✔
345
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
100,171,235✔
346
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
2,761,417✔
347
    if (pInfo == NULL) {
2,761,331✔
348
      continue;
×
349
    }
350
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
2,761,331✔
351
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
5,702,041✔
352
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
2,940,710✔
353
    }
354
  }
355

356
  SName name = {0};
97,409,818✔
357
  if (tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
97,409,818✔
358
    tscError("QID:0x%" PRIx64 ", failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
×
359
  }
360

361
  SUseDbOutput output = {0};
97,393,485✔
362
  code = queryBuildUseDbOutput(&output, &usedbRsp);
97,393,485✔
363
  if (code != 0) {
97,410,483✔
364
    terrno = code;
×
365
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
×
366

367
    tscError("QID:0x%" PRIx64 ", failed to build use db output since %s", pRequest->requestId, terrstr());
×
368
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
97,410,483✔
369
    struct SCatalog* pCatalog = NULL;
1,275,518✔
370

371
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,275,518✔
372
    if (code1 != TSDB_CODE_SUCCESS) {
1,275,518✔
373
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
×
374
              tstrerror(code1));
375
    } else {
376
      if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0) {
1,275,518✔
377
        tscError("QID:0x%" PRIx64 ", failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
×
378
                 output.dbId);
379
      }
380
      output.dbVgroup = NULL;
1,275,518✔
381
    }
382
  }
383

384
  taosMemoryFreeClear(output.dbVgroup);
97,382,877✔
385
  tFreeSUsedbRsp(&usedbRsp);
97,439,598✔
386

387
  char db[TSDB_DB_NAME_LEN] = {0};
97,384,990✔
388
  if (tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
97,384,904✔
389
    tscError("QID:0x%" PRIx64 ", failed to get db name since %s", pRequest->requestId, tstrerror(code));
×
390
  }
391

392
  setConnectionDB(pRequest->pTscObj, db);
97,376,398✔
393

394
  taosMemoryFree(pMsg->pData);
97,442,811✔
395
  taosMemoryFree(pMsg->pEpSet);
97,418,383✔
396

397
  if (pRequest->body.queryFp != NULL) {
97,439,014✔
398
    doRequestCallback(pRequest, pRequest->code);
97,438,965✔
399
  } else {
400
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
401
      tscError("failed to post semaphore");
×
402
    }
403
  }
404
  return 0;
97,423,913✔
405
}
406

407
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,051,838✔
408
  if (pMsg == NULL) {
2,051,838✔
409
    tscError("processCreateSTableRsp: invalid input param, pMsg is NULL");
×
410
    return TSDB_CODE_TSC_INVALID_INPUT;
×
411
  }
412
  if (param == NULL) {
2,051,838✔
413
    taosMemoryFree(pMsg->pEpSet);
×
414
    taosMemoryFree(pMsg->pData);
×
415
    tscError("processCreateSTableRsp: invalid input param, param is NULL");
×
416
    return TSDB_CODE_TSC_INVALID_INPUT;
×
417
  }
418

419
  SRequestObj* pRequest = param;
2,051,838✔
420

421
  if (code != TSDB_CODE_SUCCESS) {
2,051,838✔
422
    setErrno(pRequest, code);
10,919✔
423
  } else {
424
    SMCreateStbRsp createRsp = {0};
2,040,919✔
425
    SDecoder       coder = {0};
2,040,919✔
426
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
2,040,919✔
427
    if (pMsg->len > 0) {
2,040,919✔
428
      code = tDecodeSMCreateStbRsp(&coder, &createRsp);  // pMsg->len == 0
2,035,489✔
429
      if (code != TSDB_CODE_SUCCESS) {
2,035,489✔
430
        setErrno(pRequest, code);
×
431
      }
432
    }
433
    tDecoderClear(&coder);
2,040,919✔
434

435
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
2,040,919✔
436
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
2,040,919✔
437
  }
438

439
  taosMemoryFree(pMsg->pEpSet);
2,051,838✔
440
  taosMemoryFree(pMsg->pData);
2,051,838✔
441

442
  if (pRequest->body.queryFp != NULL) {
2,051,838✔
443
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,656,571✔
444

445
    if (code == TSDB_CODE_SUCCESS) {
1,656,571✔
446
      SCatalog* pCatalog = NULL;
1,651,003✔
447
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,651,003✔
448
      if (pRes->res != NULL) {
1,651,003✔
449
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
1,647,750✔
450
      }
451

452
      if (ret != TSDB_CODE_SUCCESS) {
1,651,003✔
453
        code = ret;
×
454
      }
455
    }
456

457
    doRequestCallback(pRequest, code);
1,656,571✔
458
  } else {
459
    if (tsem_post(&pRequest->body.rspSem) != 0) {
395,267✔
460
      tscError("failed to post semaphore");
×
461
    }
462
  }
463
  return code;
2,051,838✔
464
}
465

466
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,342,823✔
467
  SRequestObj* pRequest = param;
1,342,823✔
468
  if (code != TSDB_CODE_SUCCESS) {
1,342,823✔
469
    setErrno(pRequest, code);
6,411✔
470
  } else {
471
    SDropDbRsp dropdbRsp = {0};
1,336,412✔
472
    if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0) {
1,336,412✔
473
      tscError("QID:0x%" PRIx64 ", deserialize SDropDbRsp failed", pRequest->requestId);
×
474
    }
475
    struct SCatalog* pCatalog = NULL;
1,336,412✔
476
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,336,412✔
477
    if (TSDB_CODE_SUCCESS == code) {
1,336,412✔
478
      if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0) {
1,336,412✔
479
        tscError("QID:0x%" PRIx64 ", failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
×
480
      }
481
      STscObj* pTscObj = pRequest->pTscObj;
1,336,412✔
482

483
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,336,412✔
484
                               .requestId = pRequest->requestId,
1,336,412✔
485
                               .requestObjRefId = pRequest->self,
1,336,307✔
486
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,336,412✔
487
      char             dbFName[TSDB_DB_FNAME_LEN] = {0};
1,336,412✔
488
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,336,412✔
489
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
1,336,412✔
490
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
491
      }
492
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,336,412✔
493
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,336,412✔
494
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
495
      }
496
    }
497
  }
498

499
  taosMemoryFree(pMsg->pData);
1,342,823✔
500
  taosMemoryFree(pMsg->pEpSet);
1,342,823✔
501

502
  if (pRequest->body.queryFp != NULL) {
1,342,823✔
503
    doRequestCallback(pRequest, code);
1,342,823✔
504
  } else {
505
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
506
      tscError("failed to post semaphore");
×
507
    }
508
  }
509
  return code;
1,342,823✔
510
}
511

512
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
5,900,008✔
513
  SRequestObj* pRequest = param;
5,900,008✔
514
  if (code != TSDB_CODE_SUCCESS) {
5,900,008✔
515
    setErrno(pRequest, code);
1,052,365✔
516
  } else {
517
    SMAlterStbRsp alterRsp = {0};
4,847,643✔
518
    SDecoder      coder = {0};
4,847,643✔
519
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
4,847,643✔
520
    if (pMsg->len > 0) {
4,847,643✔
521
      code = tDecodeSMAlterStbRsp(&coder, &alterRsp);  // pMsg->len == 0
4,821,612✔
522
      if (code != TSDB_CODE_SUCCESS) {
4,821,612✔
523
        setErrno(pRequest, code);
×
524
      }
525
    }
526
    tDecoderClear(&coder);
4,847,643✔
527

528
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
4,847,643✔
529
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
4,847,643✔
530
  }
531

532
  taosMemoryFree(pMsg->pData);
5,900,008✔
533
  taosMemoryFree(pMsg->pEpSet);
5,900,008✔
534

535
  if (pRequest->body.queryFp != NULL) {
5,900,008✔
536
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
5,900,008✔
537

538
    if (code == TSDB_CODE_SUCCESS) {
5,900,008✔
539
      SCatalog* pCatalog = NULL;
4,847,643✔
540
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
4,847,643✔
541
      if (pRes->res != NULL) {
4,847,643✔
542
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
4,821,612✔
543
      }
544

545
      if (ret != TSDB_CODE_SUCCESS) {
4,847,643✔
546
        code = ret;
×
547
      }
548
    }
549

550
    doRequestCallback(pRequest, code);
5,900,008✔
551
  } else {
552
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
553
      tscError("failed to post semaphore");
×
554
    }
555
  }
556
  return code;
5,900,008✔
557
}
558

559
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
9,740✔
560
  int32_t      code = 0;
9,740✔
561
  int32_t      line = 0;
9,740✔
562
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
9,740✔
563
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
9,740✔
564
  pBlock->info.hasVarCol = true;
9,740✔
565

566
  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
9,740✔
567
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
9,740✔
568
  SColumnInfoData infoData = {0};
9,740✔
569
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
9,740✔
570
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
9,740✔
571
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
19,480✔
572

573
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
9,740✔
574
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
9,740✔
575
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
19,480✔
576

577
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
9,740✔
578
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
9,740✔
579
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
19,480✔
580

581
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
9,740✔
582
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD4_LEN;
9,740✔
583
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
19,480✔
584

585
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
9,740✔
586
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD5_LEN;
9,740✔
587
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
19,480✔
588

589
  int32_t numOfCfg = taosArrayGetSize(pVars);
9,740✔
590
  code = blockDataEnsureCapacity(pBlock, numOfCfg);
9,740✔
591
  TSDB_CHECK_CODE(code, line, END);
9,740✔
592

593
  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
928,942✔
594
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
919,202✔
595
    TSDB_CHECK_NULL(pInfo, code, line, END, terrno);
919,202✔
596

597
    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
919,202✔
598
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
919,202✔
599
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
919,202✔
600
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
919,202✔
601
    code = colDataSetVal(pColInfo, i, name, false);
919,202✔
602
    TSDB_CHECK_CODE(code, line, END);
919,202✔
603

604
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
919,202✔
605
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
919,202✔
606
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
919,202✔
607
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
919,202✔
608
    code = colDataSetVal(pColInfo, i, value, false);
919,202✔
609
    TSDB_CHECK_CODE(code, line, END);
919,202✔
610

611
    char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
919,202✔
612
    STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
919,202✔
613
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
919,202✔
614
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
919,202✔
615
    code = colDataSetVal(pColInfo, i, scope, false);
919,202✔
616
    TSDB_CHECK_CODE(code, line, END);
919,202✔
617

618
    char category[TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE] = {0};
919,202✔
619
    STR_WITH_MAXSIZE_TO_VARSTR(category, pInfo->category, TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE);
919,202✔
620
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
919,202✔
621
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
919,202✔
622
    code = colDataSetVal(pColInfo, i, category, false);
919,202✔
623
    TSDB_CHECK_CODE(code, line, END);
919,202✔
624

625
    char info[TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE] = {0};
919,202✔
626
    STR_WITH_MAXSIZE_TO_VARSTR(info, pInfo->info, TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE);
919,202✔
627
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
919,202✔
628
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
919,202✔
629
    code = colDataSetVal(pColInfo, i, info, false);
919,202✔
630
    TSDB_CHECK_CODE(code, line, END);
919,202✔
631
  }
632

633
  pBlock->info.rows = numOfCfg;
9,740✔
634

635
  *block = pBlock;
9,740✔
636
  return code;
9,740✔
637

638
END:
×
639
  taosArrayDestroy(pBlock->pDataBlock);
×
640
  taosMemoryFree(pBlock);
×
641
  return code;
×
642
}
643

644
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
9,740✔
645
  SSDataBlock* pBlock = NULL;
9,740✔
646
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
9,740✔
647
  if (code) {
9,740✔
648
    return code;
×
649
  }
650

651
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
9,740✔
652
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
9,740✔
653
  *pRsp = taosMemoryCalloc(1, rspSize);
9,740✔
654
  if (NULL == *pRsp) {
9,740✔
655
    code = terrno;
×
656
    goto _exit;
×
657
  }
658

659
  (*pRsp)->useconds = 0;
9,740✔
660
  (*pRsp)->completed = 1;
9,740✔
661
  (*pRsp)->precision = 0;
9,740✔
662
  (*pRsp)->compressed = 0;
9,740✔
663

664
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
9,740✔
665
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
9,740✔
666

667
  int32_t len = 0;
9,740✔
668
  if ((*pRsp)->numOfRows > 0) {
9,740✔
669
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
9,212✔
670
    if (len < 0) {
9,212✔
671
      uError("buildShowVariablesRsp error, len:%d", len);
×
672
      code = terrno;
×
673
      goto _exit;
×
674
    }
675
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
9,212✔
676

677
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
9,212✔
678
    (*pRsp)->payloadLen = htonl(payloadLen);
9,212✔
679
    (*pRsp)->compLen = htonl(payloadLen);
9,212✔
680

681
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
9,212✔
682
      uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
683
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
684
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
685
      goto _exit;
×
686
    }
687
  }
688

689
  blockDataDestroy(pBlock);
9,740✔
690
  pBlock = NULL;
9,740✔
691

692
  return TSDB_CODE_SUCCESS;
9,740✔
693
_exit:
×
694
  if (*pRsp) {
×
695
    taosMemoryFree(*pRsp);
×
696
    *pRsp = NULL;
×
697
  }
698
  if (pBlock) {
×
699
    blockDataDestroy(pBlock);
×
700
    pBlock = NULL;
×
701
  }
702
  return code;
×
703
}
704

705
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
9,740✔
706
  SRequestObj* pRequest = param;
9,740✔
707
  if (code != TSDB_CODE_SUCCESS) {
9,740✔
708
    setErrno(pRequest, code);
×
709
  } else {
710
    SShowVariablesRsp  rsp = {0};
9,740✔
711
    SRetrieveTableRsp* pRes = NULL;
9,740✔
712
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
9,740✔
713
    if (TSDB_CODE_SUCCESS == code) {
9,740✔
714
      code = buildShowVariablesRsp(rsp.variables, &pRes);
9,740✔
715
    }
716
    if (TSDB_CODE_SUCCESS == code) {
9,740✔
717
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
9,740✔
718
    }
719

720
    if (code != 0) {
9,740✔
721
      pRequest->body.resInfo.pRspMsg = NULL;
×
722
      taosMemoryFree(pRes);
×
723
    }
724
    tFreeSShowVariablesRsp(&rsp);
9,740✔
725
  }
726

727
  taosMemoryFree(pMsg->pData);
9,740✔
728
  taosMemoryFree(pMsg->pEpSet);
9,740✔
729

730
  if (pRequest->body.queryFp != NULL) {
9,740✔
731
    doRequestCallback(pRequest, code);
9,740✔
732
  } else {
733
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
734
      tscError("failed to post semaphore");
×
735
    }
736
  }
737
  return code;
9,740✔
738
}
739

740
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
50,923✔
741
  int32_t      code = 0;
50,923✔
742
  int32_t      line = 0;
50,923✔
743
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
50,923✔
744
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
50,923✔
745
  pBlock->info.hasVarCol = true;
50,923✔
746

747
  pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
50,923✔
748
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
50,923✔
749
  SColumnInfoData infoData = {0};
50,923✔
750
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
50,923✔
751
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
50,923✔
752
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
101,846✔
753

754
  infoData.info.type = TSDB_DATA_TYPE_INT;
50,923✔
755
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
50,923✔
756
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
101,846✔
757

758
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
50,923✔
759
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
50,923✔
760
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
101,846✔
761

762
  code = blockDataEnsureCapacity(pBlock, 1);
50,923✔
763
  TSDB_CHECK_CODE(code, line, END);
50,923✔
764

765
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
50,923✔
766
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
50,923✔
767
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
50,923✔
768
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
50,923✔
769
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
50,923✔
770
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
50,923✔
771

772
  char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
50,923✔
773
  char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
50,923✔
774
  if (pRsp->bAccepted) {
50,923✔
775
    STR_TO_VARSTR(result, "accepted");
50,923✔
776
    code = colDataSetVal(pResultCol, 0, result, false);
50,923✔
777
    TSDB_CHECK_CODE(code, line, END);
50,923✔
778
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
50,923✔
779
    TSDB_CHECK_CODE(code, line, END);
50,923✔
780
    STR_TO_VARSTR(reason, "success");
50,923✔
781
    code = colDataSetVal(pReasonCol, 0, reason, false);
50,923✔
782
    TSDB_CHECK_CODE(code, line, END);
50,923✔
783
  } else {
784
    STR_TO_VARSTR(result, "rejected");
×
785
    code = colDataSetVal(pResultCol, 0, result, false);
×
786
    TSDB_CHECK_CODE(code, line, END);
×
787
    colDataSetNULL(pIdCol, 0);
788
    STR_TO_VARSTR(reason, "compaction is ongoing");
×
789
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
790
    TSDB_CHECK_CODE(code, line, END);
×
791
  }
792
  pBlock->info.rows = 1;
50,923✔
793

794
  *block = pBlock;
50,923✔
795

796
  return TSDB_CODE_SUCCESS;
50,923✔
797
END:
×
798
  taosMemoryFree(pBlock);
×
799
  taosArrayDestroy(pBlock->pDataBlock);
×
800
  return code;
×
801
}
802

803
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
50,923✔
804
  SSDataBlock* pBlock = NULL;
50,923✔
805
  int32_t      code = buildCompactDbBlock(pCompactDb, &pBlock);
50,923✔
806
  if (code) {
50,923✔
807
    return code;
×
808
  }
809

810
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
50,923✔
811
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
50,923✔
812
  *pRsp = taosMemoryCalloc(1, rspSize);
50,923✔
813
  if (NULL == *pRsp) {
50,923✔
814
    code = terrno;
×
815
    goto _exit;
×
816
  }
817

818
  (*pRsp)->useconds = 0;
50,923✔
819
  (*pRsp)->completed = 1;
50,923✔
820
  (*pRsp)->precision = 0;
50,923✔
821
  (*pRsp)->compressed = 0;
50,923✔
822
  (*pRsp)->compLen = 0;
50,923✔
823
  (*pRsp)->payloadLen = 0;
50,923✔
824
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
50,923✔
825
  (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
50,923✔
826

827
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
50,923✔
828
  if (len < 0) {
50,923✔
829
    uError("buildRetriveTableRspForCompactDb error, len:%d", len);
×
830
    code = terrno;
×
831
    goto _exit;
×
832
  }
833
  blockDataDestroy(pBlock);
50,923✔
834

835
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
50,923✔
836

837
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
50,923✔
838
  (*pRsp)->payloadLen = htonl(payloadLen);
50,923✔
839
  (*pRsp)->compLen = htonl(payloadLen);
50,923✔
840

841
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
50,923✔
842
    uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
843
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
844
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
845
    goto _exit;
×
846
  }
847

848
  return TSDB_CODE_SUCCESS;
50,923✔
849
_exit:
×
850
  if (*pRsp) {
×
851
    taosMemoryFree(*pRsp);
×
852
    *pRsp = NULL;
×
853
  }
854
  if (pBlock) {
×
855
    blockDataDestroy(pBlock);
×
856
    pBlock = NULL;
×
857
  }
858
  return code;
×
859
}
860

861
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
36,495✔
862
  SRequestObj* pRequest = param;
36,495✔
863
  if (code != TSDB_CODE_SUCCESS) {
36,495✔
864
    setErrno(pRequest, code);
2,083✔
865
  } else {
866
    SCompactDbRsp      rsp = {0};
34,412✔
867
    SRetrieveTableRsp* pRes = NULL;
34,412✔
868
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
34,412✔
869
    if (TSDB_CODE_SUCCESS == code) {
34,412✔
870
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
34,412✔
871
    }
872
    if (TSDB_CODE_SUCCESS == code) {
34,412✔
873
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
34,412✔
874
    }
875

876
    if (code != 0) {
34,412✔
877
      pRequest->body.resInfo.pRspMsg = NULL;
×
878
      taosMemoryFree(pRes);
×
879
    }
880
  }
881

882
  taosMemoryFree(pMsg->pData);
36,495✔
883
  taosMemoryFree(pMsg->pEpSet);
36,495✔
884

885
  if (pRequest->body.queryFp != NULL) {
36,495✔
886
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
36,495✔
887
  } else {
888
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
889
      tscError("failed to post semaphore");
×
890
    }
891
  }
892
  return code;
36,495✔
893
}
894

895
static int32_t buildScanDbBlock(SScanDbRsp* pRsp, SSDataBlock** block) {
401✔
896
  int32_t      code = 0;
401✔
897
  int32_t      line = 0;
401✔
898
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
401✔
899
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
401✔
900
  pBlock->info.hasVarCol = true;
401✔
901

902
  pBlock->pDataBlock = taosArrayInit(SCAN_DB_RESULT_COLS, sizeof(SColumnInfoData));
401✔
903
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
401✔
904
  SColumnInfoData infoData = {0};
401✔
905
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
401✔
906
  infoData.info.bytes = SCAN_DB_RESULT_FIELD1_LEN;
401✔
907
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
802✔
908

909
  infoData.info.type = TSDB_DATA_TYPE_INT;
401✔
910
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
401✔
911
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
802✔
912

913
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
401✔
914
  infoData.info.bytes = SCAN_DB_RESULT_FIELD3_LEN;
401✔
915
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
802✔
916

917
  code = blockDataEnsureCapacity(pBlock, 1);
401✔
918
  TSDB_CHECK_CODE(code, line, END);
401✔
919

920
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
401✔
921
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
401✔
922
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
401✔
923
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
401✔
924
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
401✔
925
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
401✔
926

927
  char result[SCAN_DB_RESULT_FIELD1_LEN] = {0};
401✔
928
  char reason[SCAN_DB_RESULT_FIELD3_LEN] = {0};
401✔
929
  if (pRsp->bAccepted) {
401✔
930
    STR_TO_VARSTR(result, "accepted");
401✔
931
    code = colDataSetVal(pResultCol, 0, result, false);
401✔
932
    TSDB_CHECK_CODE(code, line, END);
401✔
933
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->scanId, false);
401✔
934
    TSDB_CHECK_CODE(code, line, END);
401✔
935
    STR_TO_VARSTR(reason, "success");
401✔
936
    code = colDataSetVal(pReasonCol, 0, reason, false);
401✔
937
    TSDB_CHECK_CODE(code, line, END);
401✔
938
  } else {
939
    STR_TO_VARSTR(result, "rejected");
×
940
    code = colDataSetVal(pResultCol, 0, result, false);
×
941
    TSDB_CHECK_CODE(code, line, END);
×
942
    colDataSetNULL(pIdCol, 0);
943
    STR_TO_VARSTR(reason, "scan is ongoing");
×
944
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
945
    TSDB_CHECK_CODE(code, line, END);
×
946
  }
947
  pBlock->info.rows = 1;
401✔
948

949
  *block = pBlock;
401✔
950

951
  return TSDB_CODE_SUCCESS;
401✔
952
END:
×
953
  taosMemoryFree(pBlock);
×
954
  taosArrayDestroy(pBlock->pDataBlock);
×
955
  return code;
×
956
}
957

958
static int32_t buildRetriveTableRspForScanDb(SScanDbRsp* pScanDb, SRetrieveTableRsp** pRsp) {
401✔
959
  SSDataBlock* pBlock = NULL;
401✔
960
  int32_t      code = buildScanDbBlock(pScanDb, &pBlock);
401✔
961
  if (code) {
401✔
962
    return code;
×
963
  }
964

965
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
401✔
966
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
401✔
967
  *pRsp = taosMemoryCalloc(1, rspSize);
401✔
968
  if (NULL == *pRsp) {
401✔
969
    code = terrno;
×
970
    goto _exit;
×
971
  }
972

973
  (*pRsp)->useconds = 0;
401✔
974
  (*pRsp)->completed = 1;
401✔
975
  (*pRsp)->precision = 0;
401✔
976
  (*pRsp)->compressed = 0;
401✔
977
  (*pRsp)->compLen = 0;
401✔
978
  (*pRsp)->payloadLen = 0;
401✔
979
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
401✔
980
  (*pRsp)->numOfCols = htonl(SCAN_DB_RESULT_COLS);
401✔
981

982
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SCAN_DB_RESULT_COLS);
401✔
983
  if (len < 0) {
401✔
984
    uError("%s error, len:%d", __func__, len);
×
985
    code = terrno;
×
986
    goto _exit;
×
987
  }
988
  blockDataDestroy(pBlock);
401✔
989

990
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
401✔
991

992
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
401✔
993
  (*pRsp)->payloadLen = htonl(payloadLen);
401✔
994
  (*pRsp)->compLen = htonl(payloadLen);
401✔
995

996
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
401✔
997
    uError("%s error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, __func__, len,
×
998
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
999
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
1000
    goto _exit;
×
1001
  }
1002

1003
  return TSDB_CODE_SUCCESS;
401✔
1004
_exit:
×
1005
  if (*pRsp) {
×
1006
    taosMemoryFree(*pRsp);
×
1007
    *pRsp = NULL;
×
1008
  }
1009
  if (pBlock) {
×
1010
    blockDataDestroy(pBlock);
×
1011
    pBlock = NULL;
×
1012
  }
1013
  return code;
×
1014
}
1015

1016
static int32_t processScanDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
676✔
1017
  SRequestObj* pRequest = param;
676✔
1018
  if (code != TSDB_CODE_SUCCESS) {
676✔
1019
    setErrno(pRequest, code);
275✔
1020
  } else {
1021
    SScanDbRsp         rsp = {0};
401✔
1022
    SRetrieveTableRsp* pRes = NULL;
401✔
1023
    code = tDeserializeSScanDbRsp(pMsg->pData, pMsg->len, &rsp);
401✔
1024
    if (TSDB_CODE_SUCCESS == code) {
401✔
1025
      code = buildRetriveTableRspForScanDb(&rsp, &pRes);
401✔
1026
    }
1027
    if (TSDB_CODE_SUCCESS == code) {
401✔
1028
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
401✔
1029
    }
1030

1031
    if (code != 0) {
401✔
1032
      pRequest->body.resInfo.pRspMsg = NULL;
×
1033
      taosMemoryFree(pRes);
×
1034
    }
1035
  }
1036

1037
  taosMemoryFree(pMsg->pData);
676✔
1038
  taosMemoryFree(pMsg->pEpSet);
676✔
1039

1040
  if (pRequest->body.queryFp != NULL) {
676✔
1041
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
676✔
1042
  } else {
1043
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1044
      tscError("failed to post semaphore");
×
1045
    }
1046
  }
1047
  return code;
676✔
1048
}
1049

1050
int32_t processTrimDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
23,459✔
1051
  SRequestObj* pRequest = param;
23,459✔
1052
  if (code != TSDB_CODE_SUCCESS) {
23,459✔
1053
    setErrno(pRequest, code);
6,948✔
1054
  } else {
1055
    STrimDbRsp         rsp = {0};
16,511✔
1056
    SRetrieveTableRsp* pRes = NULL;
16,511✔
1057
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, (SCompactDbRsp*)&rsp);
16,511✔
1058
    if (TSDB_CODE_SUCCESS == code) {
16,511✔
1059
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
16,511✔
1060
    }
1061
    if (TSDB_CODE_SUCCESS == code) {
16,511✔
1062
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
16,511✔
1063
    }
1064

1065
    if (code != 0) {
16,511✔
1066
      pRequest->body.resInfo.pRspMsg = NULL;
×
1067
      taosMemoryFree(pRes);
×
1068
    }
1069
  }
1070

1071
  taosMemoryFree(pMsg->pData);
23,459✔
1072
  taosMemoryFree(pMsg->pEpSet);
23,459✔
1073

1074
  if (pRequest->body.queryFp != NULL) {
23,459✔
1075
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
23,459✔
1076
  } else {
1077
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1078
      tscError("failed to post semaphore");
×
1079
    }
1080
  }
1081
  return code;
23,459✔
1082
}
1083

1084
static int32_t buildCreateTokenBlock(SCreateTokenRsp* pRsp, SSDataBlock** block) {
20,233✔
1085
  int32_t      code = 0;
20,233✔
1086
  int32_t      line = 0;
20,233✔
1087
  SSDataBlock* pBlock = taosMemoryCalloc(CREATE_TOKEN_RESULT_COLS, sizeof(SSDataBlock));
20,233✔
1088
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
20,233✔
1089
  pBlock->info.hasVarCol = true;
20,233✔
1090

1091
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
20,233✔
1092
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
20,233✔
1093
  SColumnInfoData infoData = {0};
20,233✔
1094
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
20,233✔
1095
  infoData.info.bytes = CREATE_TOKEN_RESULT_FIELD1_LEN;
20,233✔
1096
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
40,466✔
1097

1098
  // Handle empty result case (when pRsp is NULL)
1099
  if (pRsp == NULL) {
20,233✔
1100
    pBlock->info.rows = 0;
179✔
1101
    *block = pBlock;
179✔
1102
    return TSDB_CODE_SUCCESS;
179✔
1103
  }
1104

1105
  code = blockDataEnsureCapacity(pBlock, 1);
20,054✔
1106
  TSDB_CHECK_CODE(code, line, END);
20,054✔
1107

1108
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
20,054✔
1109
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
20,054✔
1110

1111
  char result[sizeof(pRsp->token) + 64] = {0};
20,054✔
1112
  STR_TO_VARSTR(result, pRsp->token);
20,054✔
1113
  code = colDataSetVal(pResultCol, 0, result, false);
20,054✔
1114
  TSDB_CHECK_CODE(code, line, END);
20,054✔
1115

1116
  pBlock->info.rows = 1;
20,054✔
1117

1118
  *block = pBlock;
20,054✔
1119
  return TSDB_CODE_SUCCESS;
20,054✔
1120

1121
END:
×
1122
  if (pBlock) {
×
1123
    taosArrayDestroy(pBlock->pDataBlock);
×
1124
    taosMemoryFree(pBlock);
×
1125
  }
1126
  return code;
×
1127
}
1128

1129
static int32_t buildTableRspForCreateToken(SCreateTokenRsp* pResp, SRetrieveTableRsp** pRsp) {
20,233✔
1130
  SSDataBlock* pBlock = NULL;
20,233✔
1131
  int32_t      code = buildCreateTokenBlock(pResp, &pBlock);
20,233✔
1132
  if (code) {
20,233✔
1133
    return code;
×
1134
  }
1135

1136
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
20,233✔
1137
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
20,233✔
1138
  *pRsp = taosMemoryCalloc(1, rspSize);
20,233✔
1139
  if (NULL == *pRsp) {
20,233✔
1140
    code = terrno;
×
1141
    goto _exit;
×
1142
  }
1143

1144
  (*pRsp)->useconds = 0;
20,233✔
1145
  (*pRsp)->completed = 1;
20,233✔
1146
  (*pRsp)->precision = 0;
20,233✔
1147
  (*pRsp)->compressed = 0;
20,233✔
1148

1149
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
20,233✔
1150
  (*pRsp)->numOfCols = htonl(CREATE_TOKEN_RESULT_COLS);
20,233✔
1151

1152
  int32_t len = 0;
20,233✔
1153
  if (pBlock->info.rows > 0) {
20,233✔
1154
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, CREATE_TOKEN_RESULT_COLS);
20,054✔
1155
    if (len < 0) {
20,054✔
1156
      uError("buildTableRspFroCreateToken error, len:%d", len);
×
1157
      code = terrno;
×
1158
      goto _exit;
×
1159
    }
1160

1161
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
20,054✔
1162

1163
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
20,054✔
1164
    (*pRsp)->payloadLen = htonl(payloadLen);
20,054✔
1165
    (*pRsp)->compLen = htonl(payloadLen);
20,054✔
1166

1167
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
20,054✔
1168
      uError("buildTableRspFroCreateToken error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
1169
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
1170
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
1171
      goto _exit;
×
1172
    }
1173
  } else {
1174
    // Empty result case
1175
    SET_PAYLOAD_LEN((*pRsp)->data, 0, 0);
179✔
1176
    (*pRsp)->payloadLen = htonl(PAYLOAD_PREFIX_LEN);
179✔
1177
    (*pRsp)->compLen = htonl(PAYLOAD_PREFIX_LEN);
179✔
1178
  }
1179
  blockDataDestroy(pBlock);
20,233✔
1180
  return TSDB_CODE_SUCCESS;
20,233✔
1181

1182
_exit:
×
1183
  if (*pRsp) {
×
1184
    taosMemoryFree(*pRsp);
×
1185
    *pRsp = NULL;
×
1186
  }
1187
  if (pBlock) {
×
1188
    blockDataDestroy(pBlock);
×
1189
    pBlock = NULL;
×
1190
  }
1191
  return code;
×
1192
}
1193

1194
int32_t processCreateTokenRsp(void* param, SDataBuf* pMsg, int32_t code) {
24,159✔
1195
  SRequestObj* pRequest = param;
24,159✔
1196
  if (code != TSDB_CODE_SUCCESS) {
24,159✔
1197
    setErrno(pRequest, code);
3,926✔
1198
  } else {
1199
    SCreateTokenRsp    rsp = {0};
20,233✔
1200
    SRetrieveTableRsp* pRes = NULL;
20,233✔
1201
    
1202
    // Handle empty message case
1203
    if (pMsg->len == 0) {
20,233✔
1204
      code = buildTableRspForCreateToken(NULL, &pRes);
179✔
1205
    } else {
1206
      code = tDeserializeSCreateTokenResp(pMsg->pData, pMsg->len, &rsp);
20,054✔
1207
      if (TSDB_CODE_SUCCESS == code) {
20,054✔
1208
        code = buildTableRspForCreateToken(&rsp, &pRes);
20,054✔
1209
      }
1210
    }
1211
    
1212
    if (TSDB_CODE_SUCCESS == code) {
20,233✔
1213
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
20,233✔
1214
    }
1215

1216
    if (code != 0) {
20,233✔
1217
      pRequest->body.resInfo.pRspMsg = NULL;
×
1218
      taosMemoryFree(pRes);
×
1219
    }
1220
  }
1221

1222
  taosMemoryFree(pMsg->pData);
24,159✔
1223
  taosMemoryFree(pMsg->pEpSet);
24,159✔
1224

1225
  if (pRequest->body.queryFp != NULL) {
24,159✔
1226
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
24,159✔
1227
  } else if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1228
    tscError("failed to post semaphore");
×
1229
  }
1230
  return code;
24,159✔
1231
}
1232

1233
static int32_t buildCreateTotpSecretBlock(SCreateTotpSecretRsp* pRsp, SSDataBlock** block) {
18,190✔
1234
  int32_t      code = 0;
18,190✔
1235
  int32_t      line = 0;
18,190✔
1236
  SSDataBlock* pBlock = taosMemoryCalloc(CREATE_TOTP_SECRET_RESULT_COLS, sizeof(SSDataBlock));
18,190✔
1237
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
18,190✔
1238
  pBlock->info.hasVarCol = true;
18,190✔
1239

1240
  pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
18,190✔
1241
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
18,190✔
1242
  SColumnInfoData infoData = {0};
18,190✔
1243
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
18,190✔
1244
  infoData.info.bytes = CREATE_TOTP_SECRET_RESULT_FIELD1_LEN;
18,190✔
1245
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
36,380✔
1246

1247
  code = blockDataEnsureCapacity(pBlock, 1);
18,190✔
1248
  TSDB_CHECK_CODE(code, line, END);
18,190✔
1249

1250
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
18,190✔
1251
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
18,190✔
1252

1253
  char result[sizeof(pRsp->totpSecret) + 64] = {0};
18,190✔
1254
  STR_TO_VARSTR(result, pRsp->totpSecret);
18,190✔
1255
  code = colDataSetVal(pResultCol, 0, result, false);
18,190✔
1256
  TSDB_CHECK_CODE(code, line, END);
18,190✔
1257

1258
  pBlock->info.rows = 1;
18,190✔
1259

1260
  *block = pBlock;
18,190✔
1261
  return TSDB_CODE_SUCCESS;
18,190✔
1262

1263
END:
×
1264
  if (pBlock) {
×
1265
    taosArrayDestroy(pBlock->pDataBlock);
×
1266
    taosMemoryFree(pBlock);
×
1267
  }
1268
  return code;
×
1269
}
1270

1271
static int32_t buildTableRspForCreateTotpSecret(SCreateTotpSecretRsp* pResp, SRetrieveTableRsp** pRsp) {
18,190✔
1272
  SSDataBlock* pBlock = NULL;
18,190✔
1273
  int32_t      code = buildCreateTotpSecretBlock(pResp, &pBlock);
18,190✔
1274
  if (code) {
18,190✔
1275
    return code;
×
1276
  }
1277

1278
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
18,190✔
1279
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
18,190✔
1280
  *pRsp = taosMemoryCalloc(1, rspSize);
18,190✔
1281
  if (NULL == *pRsp) {
18,190✔
1282
    code = terrno;
×
1283
    goto _exit;
×
1284
  }
1285

1286
  (*pRsp)->useconds = 0;
18,190✔
1287
  (*pRsp)->completed = 1;
18,190✔
1288
  (*pRsp)->precision = 0;
18,190✔
1289
  (*pRsp)->compressed = 0;
18,190✔
1290

1291
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
18,190✔
1292
  (*pRsp)->numOfCols = htonl(CREATE_TOTP_SECRET_RESULT_COLS);
18,190✔
1293

1294
  int32_t len =
1295
      blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, CREATE_TOTP_SECRET_RESULT_COLS);
18,190✔
1296
  if (len < 0) {
18,190✔
1297
    uError("buildTableRspFroCreateTotpSecret error, len:%d", len);
×
1298
    code = terrno;
×
1299
    goto _exit;
×
1300
  }
1301

1302
  blockDataDestroy(pBlock);
18,190✔
1303
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
18,190✔
1304

1305
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
18,190✔
1306
  (*pRsp)->payloadLen = htonl(payloadLen);
18,190✔
1307
  (*pRsp)->compLen = htonl(payloadLen);
18,190✔
1308

1309
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
18,190✔
1310
    uError("buildTableRspFroCreateTotpSecret error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
1311
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
1312
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
1313
    goto _exit;
×
1314
  }
1315
  return TSDB_CODE_SUCCESS;
18,190✔
1316

1317
_exit:
×
1318
  if (*pRsp) {
×
1319
    taosMemoryFree(*pRsp);
×
1320
    *pRsp = NULL;
×
1321
  }
1322
  if (pBlock) {
×
1323
    blockDataDestroy(pBlock);
×
1324
    pBlock = NULL;
×
1325
  }
1326
  return code;
×
1327
}
1328

1329
int32_t processCreateTotpSecretRsp(void* param, SDataBuf* pMsg, int32_t code) {
18,358✔
1330
  SRequestObj* pRequest = param;
18,358✔
1331
  if (code != TSDB_CODE_SUCCESS) {
18,358✔
1332
    setErrno(pRequest, code);
168✔
1333
  } else {
1334
    SCreateTotpSecretRsp    rsp = {0};
18,190✔
1335
    SRetrieveTableRsp* pRes = NULL;
18,190✔
1336
    code = tDeserializeSCreateTotpSecretRsp(pMsg->pData, pMsg->len, &rsp);
18,190✔
1337
    if (TSDB_CODE_SUCCESS == code) {
18,190✔
1338
      code = buildTableRspForCreateTotpSecret(&rsp, &pRes);
18,190✔
1339
    }
1340
    if (TSDB_CODE_SUCCESS == code) {
18,190✔
1341
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
18,190✔
1342
    }
1343

1344
    if (code != 0) {
18,190✔
1345
      pRequest->body.resInfo.pRspMsg = NULL;
×
1346
      taosMemoryFree(pRes);
×
1347
    }
1348
  }
1349

1350
  taosMemoryFree(pMsg->pData);
18,358✔
1351
  taosMemoryFree(pMsg->pEpSet);
18,358✔
1352

1353
  if (pRequest->body.queryFp != NULL) {
18,358✔
1354
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
18,358✔
1355
  } else if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1356
    tscError("failed to post semaphore");
×
1357
  }
1358
  return code;
18,358✔
1359
}
1360

1361
int32_t processCreateXnodeTaskRsp(void* param, SDataBuf* pMsg, int32_t code) {
4,653✔
1362
  SRequestObj* pRequest = param;
4,653✔
1363
  if (code != TSDB_CODE_SUCCESS) {
4,653✔
1364
    setErrno(pRequest, code);
4,653✔
1365
    if (code == TSDB_CODE_MND_XNODE_HTTP_CODE_ERROR) {
4,653✔
1366
      if (pMsg->pData != NULL && pMsg->len > 0) {
×
1367
        if (pMsg->len <= pRequest->msgBufLen) {
×
1368
          tstrncpy(pRequest->msgBuf, (char*)pMsg->pData, pRequest->msgBufLen);
×
1369
        } else {
1370
          taosMemoryFreeClear(pRequest->msgBuf);
×
1371
          pRequest->msgBuf = pMsg->pData;
×
1372
          pMsg->pData = NULL;
×
1373
          pRequest->msgBufLen = pMsg->len;
×
1374
        }
1375
      }
1376
    }
1377
  }
1378

1379
  if (pMsg->pData) {
4,653✔
1380
    taosMemoryFree(pMsg->pData);
×
1381
  }
1382
  taosMemoryFree(pMsg->pEpSet);
4,653✔
1383

1384
  if (pRequest->body.queryFp != NULL) {
4,653✔
1385
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
4,653✔
1386
  } else if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1387
    tscError("failed to post semaphore");
×
1388
  }
1389
  return code;
4,653✔
1390
}
1391

1392

1393
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
210,566,079✔
1394
  switch (msgType) {
210,566,079✔
1395
    case TDMT_MND_CONNECT:
97,720,235✔
1396
      return processConnectRsp;
97,720,235✔
1397
    case TDMT_MND_CREATE_DB:
1,538,757✔
1398
      return processCreateDbRsp;
1,538,757✔
1399
    case TDMT_MND_USE_DB:
97,436,487✔
1400
      return processUseDbRsp;
97,436,487✔
1401
    case TDMT_MND_CREATE_STB:
2,051,838✔
1402
      return processCreateSTableRsp;
2,051,838✔
1403
    case TDMT_MND_DROP_DB:
1,342,823✔
1404
      return processDropDbRsp;
1,342,823✔
1405
    case TDMT_MND_ALTER_STB:
5,900,008✔
1406
      return processAlterStbRsp;
5,900,008✔
1407
    case TDMT_MND_SHOW_VARIABLES:
9,740✔
1408
      return processShowVariablesRsp;
9,740✔
1409
    case TDMT_MND_COMPACT_DB:
36,495✔
1410
      return processCompactDbRsp;
36,495✔
1411
    case TDMT_MND_TRIM_DB:
23,459✔
1412
      return processTrimDbRsp;
23,459✔
1413
    case TDMT_MND_SCAN_DB:
676✔
1414
      return processScanDbRsp;
676✔
1415
    case TDMT_MND_CREATE_TOKEN:
24,159✔
1416
      return processCreateTokenRsp;
24,159✔
1417
    case TDMT_MND_CREATE_TOTP_SECRET:
18,358✔
1418
      return processCreateTotpSecretRsp;
18,358✔
1419
    case TDMT_MND_CREATE_XNODE_TASK:
4,653✔
1420
      return processCreateXnodeTaskRsp;
4,653✔
1421

1422
    default:
4,458,391✔
1423
      return genericRspCallback;
4,458,391✔
1424
  }
1425
}
1426

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc