• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4869

26 Nov 2025 05:46AM UTC coverage: 64.539% (-0.09%) from 64.629%
#4869

push

travis-ci

guanshengliang
Merge branch '3.0' into cover/3.0

771 of 945 new or added lines in 33 files covered. (81.59%)

3214 existing lines in 124 files now uncovered.

158203 of 245129 relevant lines covered (64.54%)

113224023.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.15
/source/client/src/clientMsgHandler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "cmdnodes.h"
21
#include "command.h"
22
#include "os.h"
23
#include "query.h"
24
#include "systable.h"
25
#include "tdatablock.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tname.h"
29
#include "tversion.h"
30

31
extern SClientHbMgr clientHbMgr;
32

33
static void setErrno(SRequestObj* pRequest, int32_t code) {
5,806,515✔
34
  pRequest->code = code;
5,806,515✔
35
  terrno = code;
5,806,515✔
36
}
5,806,515✔
37

38
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
4,610,078✔
39
  SRequestObj* pRequest = param;
4,610,078✔
40
  setErrno(pRequest, code);
4,610,078✔
41

42
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
4,610,078✔
43
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
1,267,973✔
44
      tscError("failed to remove meta data for table");
×
45
    }
46
  }
47

48
  taosMemoryFree(pMsg->pEpSet);
4,610,078✔
49
  taosMemoryFree(pMsg->pData);
4,610,078✔
50
  if (pRequest->body.queryFp != NULL) {
4,610,078✔
51
    doRequestCallback(pRequest, code);
4,609,776✔
52
  } else {
53
    if (tsem_post(&pRequest->body.rspSem) != 0) {
302✔
54
      tscError("failed to post semaphore");
×
55
    }
56
  }
57
  return code;
4,610,078✔
58
}
59

60
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
3,741,933✔
61
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
3,741,933✔
62
  if (NULL == pRequest) {
3,742,254✔
63
    goto EXIT;
×
64
  }
65

66
  if (code != TSDB_CODE_SUCCESS) {
3,742,254✔
67
    goto End;
12,410✔
68
  }
69

70
  STscObj* pTscObj = pRequest->pTscObj;
3,729,844✔
71

72
  if (NULL == pTscObj->pAppInfo) {
3,729,844✔
73
    code = TSDB_CODE_TSC_DISCONNECTED;
×
74
    goto End;
×
75
  }
76

77
  SConnectRsp connectRsp = {0};
3,729,844✔
78
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
3,729,844✔
79
    code = TSDB_CODE_TSC_INVALID_VERSION;
×
80
    goto End;
×
81
  }
82

83
  if ((code = taosCheckVersionCompatibleFromStr(td_version, connectRsp.sVer, 3)) != 0) {
3,729,503✔
84
    tscError("version not compatible. client version:%s, server version:%s", td_version, connectRsp.sVer);
×
85
    goto End;
×
86
  }
87

88
  int32_t now = taosGetTimestampSec();
3,729,744✔
89
  int32_t delta = abs(now - connectRsp.svrTimestamp);
3,729,422✔
90
  if (delta > timestampDeltaLimit) {
3,729,422✔
UNCOV
91
    code = TSDB_CODE_TIME_UNSYNCED;
×
UNCOV
92
    tscError("time diff:%ds is too big", delta);
×
93
    goto End;
×
94
  }
95

96
  if (connectRsp.epSet.numOfEps == 0) {
3,729,679✔
97
    code = TSDB_CODE_APP_ERROR;
×
98
    goto End;
×
99
  }
100

101
  int    updateEpSet = 1;
3,729,679✔
102
  SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
3,729,679✔
103
  if (connectRsp.dnodeNum == 1) {
3,729,679✔
104
    SEpSet dstEpSet = connectRsp.epSet;
3,591,031✔
105
    if (srcEpSet.numOfEps == 1) {
3,591,031✔
106
      if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
2,245,666✔
107
                            dstEpSet.eps[dstEpSet.inUse].fqdn) != 0) {
2,244,504✔
108
        tscError("failed to set default addr for rpc");
×
109
      }
110
      updateEpSet = 0;
2,244,112✔
111
    }
112
  }
113
  if (updateEpSet == 1 && !isEpsetEqual(&srcEpSet, &connectRsp.epSet)) {
3,729,003✔
114
    SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
1,372,310✔
115

116
    SEpSet* pOrig = &corEpSet;
1,371,926✔
117
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
1,371,926✔
118
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
1,371,926✔
119
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
1,371,926✔
120
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
121
             pNewEp->port);
122
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
1,371,926✔
123
  }
124

125
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
7,563,703✔
126
    tscDebug("QID:0x%" PRIx64 ", epSet.fqdn[%d]:%s port:%d, conn:0x%" PRIx64, pRequest->requestId, i,
3,835,715✔
127
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
128
  }
129

130
  pTscObj->sysInfo = connectRsp.sysInfo;
3,727,988✔
131
  pTscObj->connId = connectRsp.connId;
3,728,361✔
132
  pTscObj->acctId = connectRsp.acctId;
3,728,370✔
133
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
3,728,764✔
134
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
3,728,764✔
135

136
  // update the appInstInfo
137
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
3,728,489✔
138
  pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
3,728,185✔
139
  pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
3,727,970✔
140
  tscDebug("monitor paras from connect rsp, clusterId:0x%" PRIx64 ", threshold:%d scope:%d",
3,727,849✔
141
           connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
142
  lastClusterId = connectRsp.clusterId;
3,727,849✔
143

144
  pTscObj->connType = connectRsp.connType;
3,727,849✔
145
  pTscObj->passInfo.ver = connectRsp.passVer;
3,725,863✔
146
  pTscObj->authVer = connectRsp.authVer;
3,725,566✔
147
  pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
3,727,792✔
148

149
  if (taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL) {
3,726,832✔
150
    if (taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo,
1,733,385✔
151
                    POINTER_BYTES) != 0) {
152
      tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
×
153
    } else {
154
#ifdef USE_MONITOR
155
      MonitorSlowLogData data = {0};
1,733,385✔
156
      data.clusterId = pTscObj->pAppInfo->clusterId;
1,733,385✔
157
      data.type = SLOW_LOG_READ_BEGINNIG;
1,733,385✔
158
      (void)monitorPutData2MonitorQueue(data);  // ignore
1,733,385✔
159
      monitorClientSlowQueryInit(connectRsp.clusterId);
1,733,385✔
160
      monitorClientSQLReqInit(connectRsp.clusterId);
1,733,385✔
161
#endif
162
    }
163
  }
164

165
  (void)taosThreadMutexLock(&clientHbMgr.lock);
3,728,990✔
166
  SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
3,729,844✔
167
  if (pAppHbMgr) {
3,729,844✔
168
    if (hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType) != 0) {
3,729,844✔
169
      tscError("QID:0x%" PRIx64 ", failed to register conn to hbMgr", pRequest->requestId);
×
170
    }
171
  } else {
172
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
173
    code = TSDB_CODE_TSC_DISCONNECTED;
×
174
    goto End;
×
175
  }
176
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
3,729,844✔
177

178
  tscDebug("QID:0x%" PRIx64 ", clusterId:0x%" PRIx64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
3,729,844✔
179
           pTscObj->pAppInfo->numOfConns);
180

181
End:
3,603,728✔
182
  if (code != 0) {
3,742,254✔
183
    setErrno(pRequest, code);
12,410✔
184
  }
185
  if (tsem_post(&pRequest->body.rspSem) != 0) {
3,742,254✔
186
    tscError("failed to post semaphore");
×
187
  }
188

189
  if (pRequest) {
3,742,254✔
190
    (void)releaseRequest(pRequest->self);
3,742,254✔
191
  }
192

193
EXIT:
3,219,330✔
194
  taosMemoryFree(param);
3,742,254✔
195
  taosMemoryFree(pMsg->pEpSet);
3,742,254✔
196
  taosMemoryFree(pMsg->pData);
3,742,254✔
197
  return code;
3,742,254✔
198
}
199

200
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
18,793,918✔
201
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
18,793,918✔
202
  if (pMsgSendInfo == NULL) return pMsgSendInfo;
18,793,415✔
203
  pMsgSendInfo->requestObjRefId = pRequest->self;
18,793,415✔
204
  pMsgSendInfo->requestId = pRequest->requestId;
18,794,033✔
205
  pMsgSendInfo->param = pRequest;
18,794,003✔
206
  pMsgSendInfo->msgType = pRequest->type;
18,793,678✔
207
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
18,793,710✔
208

209
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
18,794,003✔
210
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
18,794,033✔
211
  return pMsgSendInfo;
18,794,111✔
212
}
213

214
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,488,131✔
215
  // todo rsp with the vnode id list
216
  SRequestObj* pRequest = param;
1,488,131✔
217
  taosMemoryFree(pMsg->pData);
1,488,131✔
218
  taosMemoryFree(pMsg->pEpSet);
1,488,131✔
219
  if (code != TSDB_CODE_SUCCESS) {
1,488,131✔
220
    setErrno(pRequest, code);
40,795✔
221
  } else {
222
    struct SCatalog* pCatalog = NULL;
1,447,336✔
223
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,447,336✔
224
    if (TSDB_CODE_SUCCESS == code) {
1,447,336✔
225
      STscObj* pTscObj = pRequest->pTscObj;
1,447,336✔
226

227
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,447,336✔
228
                               .requestId = pRequest->requestId,
1,447,336✔
229
                               .requestObjRefId = pRequest->self,
1,447,336✔
230
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,447,336✔
231
      char             dbFName[TSDB_DB_FNAME_LEN];
1,316,102✔
232
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,447,336✔
233
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,447,336✔
234
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
235
      }
236
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,447,336✔
237
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,447,336✔
238
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
239
      }
240
    }
241
  }
242

243
  if (pRequest->body.queryFp) {
1,488,131✔
244
    doRequestCallback(pRequest, code);
1,488,131✔
245
  } else {
246
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
247
      tscError("failed to post semaphore");
×
248
    }
249
  }
250
  return code;
1,488,131✔
251
}
252

253
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
3,017,762✔
254
  SRequestObj* pRequest = param;
3,017,762✔
255
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
3,017,762✔
256
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
257
    SUseDbRsp usedbRsp = {0};
1,874✔
258
    if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
1,874✔
259
      tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
1,874✔
260
    }
261
    struct SCatalog* pCatalog = NULL;
1,874✔
262

263
    if (usedbRsp.vgVersion >= 0) {  // cached in local
1,874✔
264
      int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
1,874✔
265
      int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
1,874✔
266
      if (code1 != TSDB_CODE_SUCCESS) {
1,874✔
267
        tscWarn("QID:0x%" PRIx64 ", catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
×
268
                tstrerror(code1));
269
      } else {
270
        if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0) {
1,874✔
271
          tscError("QID:0x%" PRIx64 ", catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
×
272
                   usedbRsp.uid);
273
        }
274
      }
275
    }
276
    tFreeSUsedbRsp(&usedbRsp);
1,874✔
277
  }
278

279
  if (code != TSDB_CODE_SUCCESS) {
3,017,502✔
280
    taosMemoryFree(pMsg->pData);
4,370✔
281
    taosMemoryFree(pMsg->pEpSet);
4,370✔
282
    setErrno(pRequest, code);
4,370✔
283

284
    if (pRequest->body.queryFp != NULL) {
4,370✔
285
      doRequestCallback(pRequest, pRequest->code);
4,370✔
286

287
    } else {
288
      if (tsem_post(&pRequest->body.rspSem) != 0) {
×
289
        tscError("failed to post semaphore");
×
290
      }
291
    }
292

293
    return code;
4,370✔
294
  }
295

296
  SUseDbRsp usedbRsp = {0};
3,013,132✔
297
  if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
3,013,132✔
298
    tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
×
299
  }
300

301
  if (strlen(usedbRsp.db) == 0) {
3,013,392✔
302
    taosMemoryFree(pMsg->pData);
×
303
    taosMemoryFree(pMsg->pEpSet);
×
304

305
    if (usedbRsp.errCode != 0) {
×
306
      return usedbRsp.errCode;
×
307
    } else {
308
      return TSDB_CODE_APP_ERROR;
×
309
    }
310
  }
311

312
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
3,013,216✔
313
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
5,548,750✔
314
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
2,535,777✔
315
    if (pInfo == NULL) {
2,535,618✔
316
      continue;
×
317
    }
318
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
2,535,618✔
319
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
5,260,066✔
320
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
2,724,448✔
321
    }
322
  }
323

324
  SName name = {0};
3,012,973✔
325
  if (tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
3,012,973✔
326
    tscError("QID:0x%" PRIx64 ", failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
×
327
  }
328

329
  SUseDbOutput output = {0};
3,013,132✔
330
  code = queryBuildUseDbOutput(&output, &usedbRsp);
3,013,132✔
331
  if (code != 0) {
3,013,132✔
332
    terrno = code;
×
333
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
×
334

335
    tscError("QID:0x%" PRIx64 ", failed to build use db output since %s", pRequest->requestId, terrstr());
×
336
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
3,013,132✔
337
    struct SCatalog* pCatalog = NULL;
1,189,070✔
338

339
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,189,070✔
340
    if (code1 != TSDB_CODE_SUCCESS) {
1,189,070✔
341
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
×
342
              tstrerror(code1));
343
    } else {
344
      if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0) {
1,189,070✔
345
        tscError("QID:0x%" PRIx64 ", failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
×
346
                 output.dbId);
347
      }
348
      output.dbVgroup = NULL;
1,189,070✔
349
    }
350
  }
351

352
  taosMemoryFreeClear(output.dbVgroup);
3,013,132✔
353
  tFreeSUsedbRsp(&usedbRsp);
3,013,132✔
354

355
  char db[TSDB_DB_NAME_LEN] = {0};
3,012,684✔
356
  if (tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
3,012,684✔
357
    tscError("QID:0x%" PRIx64 ", failed to get db name since %s", pRequest->requestId, tstrerror(code));
×
358
  }
359

360
  setConnectionDB(pRequest->pTscObj, db);
3,012,996✔
361

362
  taosMemoryFree(pMsg->pData);
3,012,996✔
363
  taosMemoryFree(pMsg->pEpSet);
3,013,290✔
364

365
  if (pRequest->body.queryFp != NULL) {
3,012,762✔
366
    doRequestCallback(pRequest, pRequest->code);
3,012,763✔
367
  } else {
368
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
369
      tscError("failed to post semaphore");
×
370
    }
371
  }
372
  return 0;
3,013,132✔
373
}
374

375
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
2,237,311✔
376
  if (pMsg == NULL) {
2,237,311✔
377
    tscError("processCreateSTableRsp: invalid input param, pMsg is NULL");
×
378
    return TSDB_CODE_TSC_INVALID_INPUT;
×
379
  }
380
  if (param == NULL) {
2,237,311✔
381
    taosMemoryFree(pMsg->pEpSet);
×
382
    taosMemoryFree(pMsg->pData);
×
383
    tscError("processCreateSTableRsp: invalid input param, param is NULL");
×
384
    return TSDB_CODE_TSC_INVALID_INPUT;
×
385
  }
386

387
  SRequestObj* pRequest = param;
2,237,311✔
388

389
  if (code != TSDB_CODE_SUCCESS) {
2,237,311✔
390
    setErrno(pRequest, code);
32,691✔
391
  } else {
392
    SMCreateStbRsp createRsp = {0};
2,204,620✔
393
    SDecoder       coder = {0};
2,204,620✔
394
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
2,204,620✔
395
    if (pMsg->len > 0) {
2,204,620✔
396
      code = tDecodeSMCreateStbRsp(&coder, &createRsp);  // pMsg->len == 0
2,190,439✔
397
      if (code != TSDB_CODE_SUCCESS) {
2,190,439✔
398
        setErrno(pRequest, code);
×
399
      }
400
    }
401
    tDecoderClear(&coder);
2,204,620✔
402

403
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
2,204,620✔
404
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
2,204,620✔
405
  }
406

407
  taosMemoryFree(pMsg->pEpSet);
2,237,311✔
408
  taosMemoryFree(pMsg->pData);
2,237,311✔
409

410
  if (pRequest->body.queryFp != NULL) {
2,237,311✔
411
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
1,845,626✔
412

413
    if (code == TSDB_CODE_SUCCESS) {
1,845,626✔
414
      SCatalog* pCatalog = NULL;
1,841,068✔
415
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,841,068✔
416
      if (pRes->res != NULL) {
1,841,068✔
417
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
1,833,246✔
418
      }
419

420
      if (ret != TSDB_CODE_SUCCESS) {
1,841,068✔
421
        code = ret;
×
422
      }
423
    }
424

425
    doRequestCallback(pRequest, code);
1,845,626✔
426
  } else {
427
    if (tsem_post(&pRequest->body.rspSem) != 0) {
391,685✔
428
      tscError("failed to post semaphore");
×
429
    }
430
  }
431
  return code;
2,237,311✔
432
}
433

434
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
1,320,624✔
435
  SRequestObj* pRequest = param;
1,320,624✔
436
  if (code != TSDB_CODE_SUCCESS) {
1,320,624✔
437
    setErrno(pRequest, code);
11,373✔
438
  } else {
439
    SDropDbRsp dropdbRsp = {0};
1,309,251✔
440
    if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0) {
1,309,251✔
441
      tscError("QID:0x%" PRIx64 ", deserialize SDropDbRsp failed", pRequest->requestId);
×
442
    }
443
    struct SCatalog* pCatalog = NULL;
1,309,251✔
444
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
1,309,251✔
445
    if (TSDB_CODE_SUCCESS == code) {
1,309,251✔
446
      if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0) {
1,309,251✔
447
        tscError("QID:0x%" PRIx64 ", failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
×
448
      }
449
      STscObj* pTscObj = pRequest->pTscObj;
1,309,251✔
450

451
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
1,309,251✔
452
                               .requestId = pRequest->requestId,
1,309,251✔
453
                               .requestObjRefId = pRequest->self,
1,309,251✔
454
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
1,309,251✔
455
      char             dbFName[TSDB_DB_FNAME_LEN] = {0};
1,309,251✔
456
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
1,309,251✔
457
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
1,309,251✔
458
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
459
      }
460
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
1,309,251✔
461
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
1,309,251✔
462
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
463
      }
464
    }
465
  }
466

467
  taosMemoryFree(pMsg->pData);
1,320,624✔
468
  taosMemoryFree(pMsg->pEpSet);
1,320,624✔
469

470
  if (pRequest->body.queryFp != NULL) {
1,320,624✔
471
    doRequestCallback(pRequest, code);
1,320,624✔
472
  } else {
473
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
474
      tscError("failed to post semaphore");
×
475
    }
476
  }
477
  return code;
1,320,624✔
478
}
479

480
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
5,998,107✔
481
  SRequestObj* pRequest = param;
5,998,107✔
482
  if (code != TSDB_CODE_SUCCESS) {
5,998,107✔
483
    setErrno(pRequest, code);
1,077,882✔
484
  } else {
485
    SMAlterStbRsp alterRsp = {0};
4,920,225✔
486
    SDecoder      coder = {0};
4,920,225✔
487
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
4,920,225✔
488
    if (pMsg->len > 0) {
4,920,225✔
489
      code = tDecodeSMAlterStbRsp(&coder, &alterRsp);  // pMsg->len == 0
4,897,578✔
490
      if (code != TSDB_CODE_SUCCESS) {
4,897,578✔
491
        setErrno(pRequest, code);
×
492
      }
493
    }
494
    tDecoderClear(&coder);
4,920,225✔
495

496
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
4,920,225✔
497
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
4,920,225✔
498
  }
499

500
  taosMemoryFree(pMsg->pData);
5,998,107✔
501
  taosMemoryFree(pMsg->pEpSet);
5,998,107✔
502

503
  if (pRequest->body.queryFp != NULL) {
5,998,107✔
504
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
5,998,107✔
505

506
    if (code == TSDB_CODE_SUCCESS) {
5,998,107✔
507
      SCatalog* pCatalog = NULL;
4,920,225✔
508
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
4,920,225✔
509
      if (pRes->res != NULL) {
4,920,225✔
510
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
4,897,578✔
511
      }
512

513
      if (ret != TSDB_CODE_SUCCESS) {
4,920,225✔
514
        code = ret;
×
515
      }
516
    }
517

518
    doRequestCallback(pRequest, code);
5,998,107✔
519
  } else {
520
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
521
      tscError("failed to post semaphore");
×
522
    }
523
  }
524
  return code;
5,998,107✔
525
}
526

527
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
44,942✔
528
  int32_t      code = 0;
44,942✔
529
  int32_t      line = 0;
44,942✔
530
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
44,942✔
531
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
44,942✔
532
  pBlock->info.hasVarCol = true;
44,942✔
533

534
  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
44,942✔
535
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
44,942✔
536
  SColumnInfoData infoData = {0};
44,942✔
537
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
44,942✔
538
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
44,942✔
539
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
89,884✔
540

541
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
44,942✔
542
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
44,942✔
543
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
89,884✔
544

545
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
44,942✔
546
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
44,942✔
547
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
89,884✔
548

549
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
44,942✔
550
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD4_LEN;
44,942✔
551
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
89,884✔
552

553
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
44,942✔
554
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD5_LEN;
44,942✔
555
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
89,884✔
556

557
  int32_t numOfCfg = taosArrayGetSize(pVars);
44,942✔
558
  code = blockDataEnsureCapacity(pBlock, numOfCfg);
44,942✔
559
  TSDB_CHECK_CODE(code, line, END);
44,942✔
560

561
  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
4,568,757✔
562
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
4,523,815✔
563
    TSDB_CHECK_NULL(pInfo, code, line, END, terrno);
4,523,815✔
564

565
    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
4,523,815✔
566
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
4,523,815✔
567
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,523,815✔
568
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,523,815✔
569
    code = colDataSetVal(pColInfo, i, name, false);
4,523,815✔
570
    TSDB_CHECK_CODE(code, line, END);
4,523,815✔
571

572
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
4,523,815✔
573
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
4,523,815✔
574
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,523,815✔
575
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,523,815✔
576
    code = colDataSetVal(pColInfo, i, value, false);
4,523,815✔
577
    TSDB_CHECK_CODE(code, line, END);
4,523,815✔
578

579
    char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
4,523,815✔
580
    STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
4,523,815✔
581
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,523,815✔
582
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,523,815✔
583
    code = colDataSetVal(pColInfo, i, scope, false);
4,523,815✔
584
    TSDB_CHECK_CODE(code, line, END);
4,523,815✔
585

586
    char category[TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE] = {0};
4,523,815✔
587
    STR_WITH_MAXSIZE_TO_VARSTR(category, pInfo->category, TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE);
4,523,815✔
588
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,523,815✔
589
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,523,815✔
590
    code = colDataSetVal(pColInfo, i, category, false);
4,523,815✔
591
    TSDB_CHECK_CODE(code, line, END);
4,523,815✔
592

593
    char info[TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE] = {0};
4,523,815✔
594
    STR_WITH_MAXSIZE_TO_VARSTR(info, pInfo->info, TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE);
4,523,815✔
595
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
4,523,815✔
596
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
4,523,815✔
597
    code = colDataSetVal(pColInfo, i, info, false);
4,523,815✔
598
    TSDB_CHECK_CODE(code, line, END);
4,523,815✔
599
  }
600

601
  pBlock->info.rows = numOfCfg;
44,942✔
602

603
  *block = pBlock;
44,942✔
604
  return code;
44,942✔
605

606
END:
×
607
  taosArrayDestroy(pBlock->pDataBlock);
×
608
  taosMemoryFree(pBlock);
×
609
  return code;
×
610
}
611

612
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
44,942✔
613
  SSDataBlock* pBlock = NULL;
44,942✔
614
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
44,942✔
615
  if (code) {
44,942✔
616
    return code;
×
617
  }
618

619
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
44,942✔
620
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
44,942✔
621
  *pRsp = taosMemoryCalloc(1, rspSize);
44,942✔
622
  if (NULL == *pRsp) {
44,942✔
623
    code = terrno;
×
624
    goto _exit;
×
625
  }
626

627
  (*pRsp)->useconds = 0;
44,942✔
628
  (*pRsp)->completed = 1;
44,942✔
629
  (*pRsp)->precision = 0;
44,942✔
630
  (*pRsp)->compressed = 0;
44,942✔
631

632
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
44,942✔
633
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
44,942✔
634

635
  int32_t len = 0;
44,942✔
636
  if ((*pRsp)->numOfRows > 0) {
44,942✔
637
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
44,130✔
638
    if (len < 0) {
44,130✔
639
      uError("buildShowVariablesRsp error, len:%d", len);
×
640
      code = terrno;
×
641
      goto _exit;
×
642
    }
643
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
44,130✔
644

645
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
44,130✔
646
    (*pRsp)->payloadLen = htonl(payloadLen);
44,130✔
647
    (*pRsp)->compLen = htonl(payloadLen);
44,130✔
648

649
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
44,130✔
650
      uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
651
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
652
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
653
      goto _exit;
×
654
    }
655
  }
656

657
  blockDataDestroy(pBlock);
44,942✔
658
  pBlock = NULL;
44,942✔
659

660
  return TSDB_CODE_SUCCESS;
44,942✔
661
_exit:
×
662
  if (*pRsp) {
×
663
    taosMemoryFree(*pRsp);
×
664
    *pRsp = NULL;
×
665
  }
666
  if (pBlock) {
×
667
    blockDataDestroy(pBlock);
×
668
    pBlock = NULL;
×
669
  }
670
  return code;
×
671
}
672

673
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
44,942✔
674
  SRequestObj* pRequest = param;
44,942✔
675
  if (code != TSDB_CODE_SUCCESS) {
44,942✔
676
    setErrno(pRequest, code);
×
677
  } else {
678
    SShowVariablesRsp  rsp = {0};
44,942✔
679
    SRetrieveTableRsp* pRes = NULL;
44,942✔
680
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
44,942✔
681
    if (TSDB_CODE_SUCCESS == code) {
44,942✔
682
      code = buildShowVariablesRsp(rsp.variables, &pRes);
44,942✔
683
    }
684
    if (TSDB_CODE_SUCCESS == code) {
44,942✔
685
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
44,942✔
686
    }
687

688
    if (code != 0) {
44,942✔
689
      pRequest->body.resInfo.pRspMsg = NULL;
×
690
      taosMemoryFree(pRes);
×
691
    }
692
    tFreeSShowVariablesRsp(&rsp);
44,942✔
693
  }
694

695
  taosMemoryFree(pMsg->pData);
44,942✔
696
  taosMemoryFree(pMsg->pEpSet);
44,942✔
697

698
  if (pRequest->body.queryFp != NULL) {
44,942✔
699
    doRequestCallback(pRequest, code);
44,942✔
700
  } else {
701
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
702
      tscError("failed to post semaphore");
×
703
    }
704
  }
705
  return code;
44,942✔
706
}
707

708
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
59,820✔
709
  int32_t      code = 0;
59,820✔
710
  int32_t      line = 0;
59,820✔
711
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
59,820✔
712
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
59,820✔
713
  pBlock->info.hasVarCol = true;
59,820✔
714

715
  pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
59,820✔
716
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
59,820✔
717
  SColumnInfoData infoData = {0};
59,820✔
718
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
59,820✔
719
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
59,820✔
720
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
119,640✔
721

722
  infoData.info.type = TSDB_DATA_TYPE_INT;
59,820✔
723
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
59,820✔
724
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
119,640✔
725

726
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
59,820✔
727
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
59,820✔
728
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
119,640✔
729

730
  code = blockDataEnsureCapacity(pBlock, 1);
59,820✔
731
  TSDB_CHECK_CODE(code, line, END);
59,820✔
732

733
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
59,820✔
734
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
59,820✔
735
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
59,820✔
736
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
59,820✔
737
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
59,820✔
738
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
59,820✔
739

740
  char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
59,820✔
741
  char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
59,820✔
742
  if (pRsp->bAccepted) {
59,820✔
743
    STR_TO_VARSTR(result, "accepted");
59,820✔
744
    code = colDataSetVal(pResultCol, 0, result, false);
59,820✔
745
    TSDB_CHECK_CODE(code, line, END);
59,820✔
746
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
59,820✔
747
    TSDB_CHECK_CODE(code, line, END);
59,820✔
748
    STR_TO_VARSTR(reason, "success");
59,820✔
749
    code = colDataSetVal(pReasonCol, 0, reason, false);
59,820✔
750
    TSDB_CHECK_CODE(code, line, END);
59,820✔
751
  } else {
752
    STR_TO_VARSTR(result, "rejected");
×
753
    code = colDataSetVal(pResultCol, 0, result, false);
×
754
    TSDB_CHECK_CODE(code, line, END);
×
755
    colDataSetNULL(pIdCol, 0);
756
    STR_TO_VARSTR(reason, "compaction is ongoing");
×
757
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
758
    TSDB_CHECK_CODE(code, line, END);
×
759
  }
760
  pBlock->info.rows = 1;
59,820✔
761

762
  *block = pBlock;
59,820✔
763

764
  return TSDB_CODE_SUCCESS;
59,820✔
765
END:
×
766
  taosMemoryFree(pBlock);
×
767
  taosArrayDestroy(pBlock->pDataBlock);
×
768
  return code;
×
769
}
770

771
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
59,820✔
772
  SSDataBlock* pBlock = NULL;
59,820✔
773
  int32_t      code = buildCompactDbBlock(pCompactDb, &pBlock);
59,820✔
774
  if (code) {
59,820✔
775
    return code;
×
776
  }
777

778
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
59,820✔
779
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
59,820✔
780
  *pRsp = taosMemoryCalloc(1, rspSize);
59,820✔
781
  if (NULL == *pRsp) {
59,820✔
782
    code = terrno;
×
783
    goto _exit;
×
784
  }
785

786
  (*pRsp)->useconds = 0;
59,820✔
787
  (*pRsp)->completed = 1;
59,820✔
788
  (*pRsp)->precision = 0;
59,820✔
789
  (*pRsp)->compressed = 0;
59,820✔
790
  (*pRsp)->compLen = 0;
59,820✔
791
  (*pRsp)->payloadLen = 0;
59,820✔
792
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
59,820✔
793
  (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
59,820✔
794

795
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
59,820✔
796
  if (len < 0) {
59,820✔
797
    uError("buildRetriveTableRspForCompactDb error, len:%d", len);
×
798
    code = terrno;
×
799
    goto _exit;
×
800
  }
801
  blockDataDestroy(pBlock);
59,820✔
802

803
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
59,820✔
804

805
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
59,820✔
806
  (*pRsp)->payloadLen = htonl(payloadLen);
59,820✔
807
  (*pRsp)->compLen = htonl(payloadLen);
59,820✔
808

809
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
59,820✔
810
    uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
811
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
812
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
813
    goto _exit;
×
814
  }
815

816
  return TSDB_CODE_SUCCESS;
59,820✔
817
_exit:
×
818
  if (*pRsp) {
×
819
    taosMemoryFree(*pRsp);
×
820
    *pRsp = NULL;
×
821
  }
822
  if (pBlock) {
×
823
    blockDataDestroy(pBlock);
×
824
    pBlock = NULL;
×
825
  }
826
  return code;
×
827
}
828

829
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
33,510✔
830
  SRequestObj* pRequest = param;
33,510✔
831
  if (code != TSDB_CODE_SUCCESS) {
33,510✔
832
    setErrno(pRequest, code);
2,634✔
833
  } else {
834
    SCompactDbRsp      rsp = {0};
30,876✔
835
    SRetrieveTableRsp* pRes = NULL;
30,876✔
836
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
30,876✔
837
    if (TSDB_CODE_SUCCESS == code) {
30,876✔
838
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
30,876✔
839
    }
840
    if (TSDB_CODE_SUCCESS == code) {
30,876✔
841
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
30,876✔
842
    }
843

844
    if (code != 0) {
30,876✔
845
      pRequest->body.resInfo.pRspMsg = NULL;
×
846
      taosMemoryFree(pRes);
×
847
    }
848
  }
849

850
  taosMemoryFree(pMsg->pData);
33,510✔
851
  taosMemoryFree(pMsg->pEpSet);
33,510✔
852

853
  if (pRequest->body.queryFp != NULL) {
33,510✔
854
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
33,510✔
855
  } else {
856
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
857
      tscError("failed to post semaphore");
×
858
    }
859
  }
860
  return code;
33,510✔
861
}
862

863
static int32_t buildScanDbBlock(SScanDbRsp* pRsp, SSDataBlock** block) {
520✔
864
  int32_t      code = 0;
520✔
865
  int32_t      line = 0;
520✔
866
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
520✔
867
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
520✔
868
  pBlock->info.hasVarCol = true;
520✔
869

870
  pBlock->pDataBlock = taosArrayInit(SCAN_DB_RESULT_COLS, sizeof(SColumnInfoData));
520✔
871
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
520✔
872
  SColumnInfoData infoData = {0};
520✔
873
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
520✔
874
  infoData.info.bytes = SCAN_DB_RESULT_FIELD1_LEN;
520✔
875
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
1,040✔
876

877
  infoData.info.type = TSDB_DATA_TYPE_INT;
520✔
878
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
520✔
879
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
1,040✔
880

881
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
520✔
882
  infoData.info.bytes = SCAN_DB_RESULT_FIELD3_LEN;
520✔
883
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
1,040✔
884

885
  code = blockDataEnsureCapacity(pBlock, 1);
520✔
886
  TSDB_CHECK_CODE(code, line, END);
520✔
887

888
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
520✔
889
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
520✔
890
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
520✔
891
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
520✔
892
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
520✔
893
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
520✔
894

895
  char result[SCAN_DB_RESULT_FIELD1_LEN] = {0};
520✔
896
  char reason[SCAN_DB_RESULT_FIELD3_LEN] = {0};
520✔
897
  if (pRsp->bAccepted) {
520✔
898
    STR_TO_VARSTR(result, "accepted");
520✔
899
    code = colDataSetVal(pResultCol, 0, result, false);
520✔
900
    TSDB_CHECK_CODE(code, line, END);
520✔
901
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->scanId, false);
520✔
902
    TSDB_CHECK_CODE(code, line, END);
520✔
903
    STR_TO_VARSTR(reason, "success");
520✔
904
    code = colDataSetVal(pReasonCol, 0, reason, false);
520✔
905
    TSDB_CHECK_CODE(code, line, END);
520✔
906
  } else {
907
    STR_TO_VARSTR(result, "rejected");
×
908
    code = colDataSetVal(pResultCol, 0, result, false);
×
909
    TSDB_CHECK_CODE(code, line, END);
×
910
    colDataSetNULL(pIdCol, 0);
911
    STR_TO_VARSTR(reason, "scan is ongoing");
×
912
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
913
    TSDB_CHECK_CODE(code, line, END);
×
914
  }
915
  pBlock->info.rows = 1;
520✔
916

917
  *block = pBlock;
520✔
918

919
  return TSDB_CODE_SUCCESS;
520✔
920
END:
×
921
  taosMemoryFree(pBlock);
×
922
  taosArrayDestroy(pBlock->pDataBlock);
×
923
  return code;
×
924
}
925

926
static int32_t buildRetriveTableRspForScanDb(SScanDbRsp* pScanDb, SRetrieveTableRsp** pRsp) {
520✔
927
  SSDataBlock* pBlock = NULL;
520✔
928
  int32_t      code = buildScanDbBlock(pScanDb, &pBlock);
520✔
929
  if (code) {
520✔
930
    return code;
×
931
  }
932

933
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
520✔
934
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
520✔
935
  *pRsp = taosMemoryCalloc(1, rspSize);
520✔
936
  if (NULL == *pRsp) {
520✔
937
    code = terrno;
×
938
    goto _exit;
×
939
  }
940

941
  (*pRsp)->useconds = 0;
520✔
942
  (*pRsp)->completed = 1;
520✔
943
  (*pRsp)->precision = 0;
520✔
944
  (*pRsp)->compressed = 0;
520✔
945
  (*pRsp)->compLen = 0;
520✔
946
  (*pRsp)->payloadLen = 0;
520✔
947
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
520✔
948
  (*pRsp)->numOfCols = htonl(SCAN_DB_RESULT_COLS);
520✔
949

950
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SCAN_DB_RESULT_COLS);
520✔
951
  if (len < 0) {
520✔
952
    uError("%s error, len:%d", __func__, len);
×
953
    code = terrno;
×
954
    goto _exit;
×
955
  }
956
  blockDataDestroy(pBlock);
520✔
957

958
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
520✔
959

960
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
520✔
961
  (*pRsp)->payloadLen = htonl(payloadLen);
520✔
962
  (*pRsp)->compLen = htonl(payloadLen);
520✔
963

964
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
520✔
965
    uError("%s error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, __func__, len,
×
966
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
967
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
968
    goto _exit;
×
969
  }
970

971
  return TSDB_CODE_SUCCESS;
520✔
972
_exit:
×
973
  if (*pRsp) {
×
974
    taosMemoryFree(*pRsp);
×
975
    *pRsp = NULL;
×
976
  }
977
  if (pBlock) {
×
978
    blockDataDestroy(pBlock);
×
979
    pBlock = NULL;
×
980
  }
981
  return code;
×
982
}
983

984
static int32_t processScanDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
780✔
985
  SRequestObj* pRequest = param;
780✔
986
  if (code != TSDB_CODE_SUCCESS) {
780✔
987
    setErrno(pRequest, code);
260✔
988
  } else {
989
    SScanDbRsp         rsp = {0};
520✔
990
    SRetrieveTableRsp* pRes = NULL;
520✔
991
    code = tDeserializeSScanDbRsp(pMsg->pData, pMsg->len, &rsp);
520✔
992
    if (TSDB_CODE_SUCCESS == code) {
520✔
993
      code = buildRetriveTableRspForScanDb(&rsp, &pRes);
520✔
994
    }
995
    if (TSDB_CODE_SUCCESS == code) {
520✔
996
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
520✔
997
    }
998

999
    if (code != 0) {
520✔
1000
      pRequest->body.resInfo.pRspMsg = NULL;
×
1001
      taosMemoryFree(pRes);
×
1002
    }
1003
  }
1004

1005
  taosMemoryFree(pMsg->pData);
780✔
1006
  taosMemoryFree(pMsg->pEpSet);
780✔
1007

1008
  if (pRequest->body.queryFp != NULL) {
780✔
1009
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
780✔
1010
  } else {
1011
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1012
      tscError("failed to post semaphore");
×
1013
    }
1014
  }
1015
  return code;
780✔
1016
}
1017

1018
int32_t processTrimDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
42,966✔
1019
  SRequestObj* pRequest = param;
42,966✔
1020
  if (code != TSDB_CODE_SUCCESS) {
42,966✔
1021
    setErrno(pRequest, code);
14,022✔
1022
  } else {
1023
    STrimDbRsp         rsp = {0};
28,944✔
1024
    SRetrieveTableRsp* pRes = NULL;
28,944✔
1025
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, (SCompactDbRsp*)&rsp);
28,944✔
1026
    if (TSDB_CODE_SUCCESS == code) {
28,944✔
1027
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
28,944✔
1028
    }
1029
    if (TSDB_CODE_SUCCESS == code) {
28,944✔
1030
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, pRequest->stmtBindVersion > 0);
28,944✔
1031
    }
1032

1033
    if (code != 0) {
28,944✔
1034
      pRequest->body.resInfo.pRspMsg = NULL;
×
1035
      taosMemoryFree(pRes);
×
1036
    }
1037
  }
1038

1039
  taosMemoryFree(pMsg->pData);
42,966✔
1040
  taosMemoryFree(pMsg->pEpSet);
42,966✔
1041

1042
  if (pRequest->body.queryFp != NULL) {
42,966✔
1043
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
42,966✔
1044
  } else {
1045
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
1046
      tscError("failed to post semaphore");
×
1047
    }
1048
  }
1049
  return code;
42,966✔
1050
}
1051

1052
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
22,534,749✔
1053
  switch (msgType) {
22,534,749✔
1054
    case TDMT_MND_CONNECT:
3,742,254✔
1055
      return processConnectRsp;
3,742,254✔
1056
    case TDMT_MND_CREATE_DB:
1,488,131✔
1057
      return processCreateDbRsp;
1,488,131✔
1058
    case TDMT_MND_USE_DB:
3,017,349✔
1059
      return processUseDbRsp;
3,017,349✔
1060
    case TDMT_MND_CREATE_STB:
2,237,018✔
1061
      return processCreateSTableRsp;
2,237,018✔
1062
    case TDMT_MND_DROP_DB:
1,320,624✔
1063
      return processDropDbRsp;
1,320,624✔
1064
    case TDMT_MND_ALTER_STB:
5,998,107✔
1065
      return processAlterStbRsp;
5,998,107✔
1066
    case TDMT_MND_SHOW_VARIABLES:
44,942✔
1067
      return processShowVariablesRsp;
44,942✔
1068
    case TDMT_MND_COMPACT_DB:
33,510✔
1069
      return processCompactDbRsp;
33,510✔
1070
    case TDMT_MND_TRIM_DB:
42,966✔
1071
      return processTrimDbRsp;
42,966✔
1072
    case TDMT_MND_SCAN_DB:
780✔
1073
      return processScanDbRsp;
780✔
1074
    default:
4,609,068✔
1075
      return genericRspCallback;
4,609,068✔
1076
  }
1077
}
1078

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc