• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3519

05 Nov 2024 11:19AM UTC coverage: 57.706% (+8.4%) from 49.32%
#3519

push

travis-ci

web-flow
Merge pull request #28652 from taosdata/fix/3_liaohj

refactor: always successfully put the retrieve msg

109445 of 245179 branches covered (44.64%)

Branch coverage included in aggregate %.

187435 of 269288 relevant lines covered (69.6%)

12869818.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.92
/source/client/src/clientMsgHandler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientMonitor.h"
19
#include "clientLog.h"
20
#include "cmdnodes.h"
21
#include "os.h"
22
#include "query.h"
23
#include "systable.h"
24
#include "tdatablock.h"
25
#include "tdef.h"
26
#include "tglobal.h"
27
#include "tname.h"
28
#include "tversion.h"
29
#include "command.h"
30

31
extern SClientHbMgr clientHbMgr;
32

33
static void setErrno(SRequestObj* pRequest, int32_t code) {
11,521✔
34
  pRequest->code = code;
11,521✔
35
  terrno = code;
11,521✔
36
}
11,521✔
37

38
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
9,073✔
39
  SRequestObj* pRequest = param;
9,073✔
40
  setErrno(pRequest, code);
9,073✔
41

42
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
9,073!
43
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0){
2,361!
44
      tscError("failed to remove meta data for table");
×
45
    }
46
  }
47

48
  taosMemoryFree(pMsg->pEpSet);
9,073✔
49
  taosMemoryFree(pMsg->pData);
9,073✔
50
  if (pRequest->body.queryFp != NULL) {
9,073✔
51
    doRequestCallback(pRequest, code);
9,067✔
52
  } else {
53
    if (tsem_post(&pRequest->body.rspSem) != 0){
6!
54
      tscError("failed to post semaphore");
×
55
    }
56
  }
57
  return code;
9,073✔
58
}
59

60
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
12,526✔
61
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
12,526✔
62
  if (NULL == pRequest) {
12,526!
63
    goto EXIT;
×
64
  }
65

66
  if (code != TSDB_CODE_SUCCESS) {
12,526✔
67
    goto End;
143✔
68
  }
69

70
  STscObj* pTscObj = pRequest->pTscObj;
12,383✔
71

72
  if (NULL == pTscObj->pAppInfo) {
12,383!
73
    code = TSDB_CODE_TSC_DISCONNECTED;
×
74
    goto End;
×
75
  }
76

77
  SConnectRsp connectRsp = {0};
12,383✔
78
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
12,383!
79
    code = TSDB_CODE_TSC_INVALID_VERSION;
×
80
    goto End;
×
81
  }
82

83
  if ((code = taosCheckVersionCompatibleFromStr(td_version, connectRsp.sVer, 3)) != 0) {
12,383!
84
    tscError("version not compatible. client version: %s, server version: %s", td_version, connectRsp.sVer);
×
85
    goto End;
×
86
  }
87

88
  int32_t now = taosGetTimestampSec();
12,383✔
89
  int32_t delta = abs(now - connectRsp.svrTimestamp);
12,383✔
90
  if (delta > timestampDeltaLimit) {
12,383!
91
    code = TSDB_CODE_TIME_UNSYNCED;
×
92
    tscError("time diff:%ds is too big", delta);
×
93
    goto End;
×
94
  }
95

96
  if (connectRsp.epSet.numOfEps == 0) {
12,383!
97
    code = TSDB_CODE_APP_ERROR;
×
98
    goto End;
×
99
  }
100

101
  int updateEpSet = 1;
12,383✔
102
  if (connectRsp.dnodeNum == 1) {
12,383✔
103
    SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
9,748✔
104
    SEpSet dstEpSet = connectRsp.epSet;
9,748✔
105
    if (srcEpSet.numOfEps == 1) {
9,748✔
106
      if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
7,440!
107
                        dstEpSet.eps[dstEpSet.inUse].fqdn) != 0){
7,440✔
108
        tscError("failed to set default addr for rpc");
×
109
      }
110
      updateEpSet = 0;
7,440✔
111
    }
112
  }
113
  if (updateEpSet == 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
12,383✔
114
    SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
2,493✔
115

116
    SEpSet* pOrig = &corEpSet;
2,493✔
117
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
2,493✔
118
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
2,493✔
119
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
2,493✔
120
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
121
             pNewEp->port);
122
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
2,493✔
123
  }
124

125
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
25,130✔
126
    tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
12,747✔
127
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
128
  }
129

130
  pTscObj->sysInfo = connectRsp.sysInfo;
12,383✔
131
  pTscObj->connId = connectRsp.connId;
12,383✔
132
  pTscObj->acctId = connectRsp.acctId;
12,383✔
133
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
12,383✔
134
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
12,383✔
135

136
  // update the appInstInfo
137
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
12,383✔
138
  pTscObj->pAppInfo->monitorParas = connectRsp.monitorParas;
12,383✔
139
  tscDebug("[monitor] paras from connect rsp, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
12,383✔
140
           connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
141
  lastClusterId = connectRsp.clusterId;
12,383✔
142

143
  pTscObj->connType = connectRsp.connType;
12,383✔
144
  pTscObj->passInfo.ver = connectRsp.passVer;
12,383✔
145
  pTscObj->authVer = connectRsp.authVer;
12,383✔
146
  pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
12,383✔
147

148
  if(taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL){
12,383✔
149
    if(taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo, POINTER_BYTES) != 0){
4,057!
150
      tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
×
151
    }else{
152
      MonitorSlowLogData data = {0};
4,057✔
153
      data.clusterId = pTscObj->pAppInfo->clusterId;
4,057✔
154
      data.type = SLOW_LOG_READ_BEGINNIG;
4,057✔
155
      (void)monitorPutData2MonitorQueue(data); // ignore
4,057✔
156
      monitorClientSlowQueryInit(connectRsp.clusterId);
4,057✔
157
      monitorClientSQLReqInit(connectRsp.clusterId);
4,057✔
158
    }
159
  }
160

161
  (void)taosThreadMutexLock(&clientHbMgr.lock);
12,383✔
162
  SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
12,383✔
163
  if (pAppHbMgr) {
12,383!
164
    if (hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType) != 0){
12,383!
165
      tscError("0x%" PRIx64 " failed to register conn to hbMgr", pRequest->requestId);
×
166
    }
167
  } else {
168
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
169
    code = TSDB_CODE_TSC_DISCONNECTED;
×
170
    goto End;
×
171
  }
172
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
12,383✔
173

174
  tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
12,383✔
175
           pTscObj->pAppInfo->numOfConns);
176

177
End:
11,132✔
178
  if (code != 0){
12,526✔
179
    setErrno(pRequest, code);
143✔
180
  }
181
  if (tsem_post(&pRequest->body.rspSem) != 0){
12,526!
182
    tscError("failed to post semaphore");
×
183
  }
184

185
  if (pRequest) {
12,526!
186
    (void)releaseRequest(pRequest->self);
12,526✔
187
  }
188

189
EXIT:
×
190
  taosMemoryFree(param);
12,526✔
191
  taosMemoryFree(pMsg->pEpSet);
12,526✔
192
  taosMemoryFree(pMsg->pData);
12,526✔
193
  return code;
12,526✔
194
}
195

196
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
38,041✔
197
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
38,041✔
198
  if(pMsgSendInfo == NULL) return pMsgSendInfo;
38,043!
199
  pMsgSendInfo->requestObjRefId = pRequest->self;
38,043✔
200
  pMsgSendInfo->requestId = pRequest->requestId;
38,043✔
201
  pMsgSendInfo->param = pRequest;
38,043✔
202
  pMsgSendInfo->msgType = pRequest->type;
38,043✔
203
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
38,043✔
204

205
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
38,043✔
206
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
38,043✔
207
  return pMsgSendInfo;
38,042✔
208
}
209

210
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
4,809✔
211
  // todo rsp with the vnode id list
212
  SRequestObj* pRequest = param;
4,809✔
213
  taosMemoryFree(pMsg->pData);
4,809✔
214
  taosMemoryFree(pMsg->pEpSet);
4,809✔
215
  if (code != TSDB_CODE_SUCCESS) {
4,809✔
216
    setErrno(pRequest, code);
541✔
217
  } else {
218
    struct SCatalog* pCatalog = NULL;
4,268✔
219
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
4,268✔
220
    if (TSDB_CODE_SUCCESS == code) {
4,268!
221
      STscObj* pTscObj = pRequest->pTscObj;
4,268✔
222

223
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
4,268✔
224
                               .requestId = pRequest->requestId,
4,268✔
225
                               .requestObjRefId = pRequest->self,
4,268✔
226
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
4,268✔
227
      char             dbFName[TSDB_DB_FNAME_LEN];
228
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
4,268✔
229
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0){
4,268!
230
        tscError("0x%" PRIx64 " failed to refresh db vg info", pRequest->requestId);
×
231
      }
232
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
4,268✔
233
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0){
4,268!
234
        tscError("0x%" PRIx64 " failed to refresh db vg info", pRequest->requestId);
×
235
      }
236
    }
237
  }
238

239
  if (pRequest->body.queryFp) {
4,809!
240
    doRequestCallback(pRequest, code);
4,809✔
241
  } else {
242
    if (tsem_post(&pRequest->body.rspSem) != 0){
×
243
      tscError("failed to post semaphore");
×
244
    }
245
  }
246
  return code;
4,809✔
247
}
248

249
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
9,448✔
250
  SRequestObj* pRequest = param;
9,448✔
251
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
9,448!
252
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
253
    SUseDbRsp usedbRsp = {0};
2✔
254
    if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0){
2!
255
      tscError("0x%" PRIx64 " deserialize SUseDbRsp failed", pRequest->requestId);
1!
256
    }
257
    struct SCatalog* pCatalog = NULL;
1✔
258

259
    if (usedbRsp.vgVersion >= 0) {  // cached in local
1!
260
      int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
1✔
261
      int32_t  code1 = catalogGetHandle(clusterId, &pCatalog);
1✔
262
      if (code1 != TSDB_CODE_SUCCESS) {
1!
263
        tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
×
264
                tstrerror(code1));
265
      } else {
266
        if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0){
1!
267
          tscError("0x%" PRIx64 "catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
×
268
                   usedbRsp.uid);
269
        }
270
      }
271
    }
272
    tFreeSUsedbRsp(&usedbRsp);
1✔
273
  }
274

275
  if (code != TSDB_CODE_SUCCESS) {
9,447✔
276
    taosMemoryFree(pMsg->pData);
8✔
277
    taosMemoryFree(pMsg->pEpSet);
8✔
278
    setErrno(pRequest, code);
8✔
279

280
    if (pRequest->body.queryFp != NULL) {
8!
281
      doRequestCallback(pRequest, pRequest->code);
8✔
282

283
    } else {
284
      if (tsem_post(&pRequest->body.rspSem) != 0){
×
285
        tscError("failed to post semaphore");
×
286
      }
287
    }
288

289
    return code;
8✔
290
  }
291

292
  SUseDbRsp usedbRsp = {0};
9,439✔
293
  if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0){
9,439!
294
    tscError("0x%" PRIx64 " deserialize SUseDbRsp failed", pRequest->requestId);
×
295
  }
296

297
  if (strlen(usedbRsp.db) == 0) {
9,438!
298
    taosMemoryFree(pMsg->pData);
×
299
    taosMemoryFree(pMsg->pEpSet);
×
300

301
    if (usedbRsp.errCode != 0) {
×
302
      return usedbRsp.errCode;
×
303
    } else {
304
      return TSDB_CODE_APP_ERROR;
×
305
    }
306
  }
307

308
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
9,438✔
309
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
16,600✔
310
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
7,161✔
311
    if (pInfo == NULL){
7,161!
312
      continue;
×
313
    }
314
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
7,161✔
315
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
14,746✔
316
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
7,585✔
317
    }
318
  }
319

320
  SName name = {0};
9,439✔
321
  if(tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
9,439!
322
    tscError("0x%" PRIx64 " failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
×
323
  }
324

325
  SUseDbOutput output = {0};
9,439✔
326
  code = queryBuildUseDbOutput(&output, &usedbRsp);
9,439✔
327
  if (code != 0) {
9,438!
328
    terrno = code;
×
329
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
×
330

331
    tscError("0x%" PRIx64 " failed to build use db output since %s", pRequest->requestId, terrstr());
×
332
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
9,438✔
333
    struct SCatalog* pCatalog = NULL;
3,185✔
334

335
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
3,185✔
336
    if (code1 != TSDB_CODE_SUCCESS) {
3,185!
337
      tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
×
338
              tstrerror(code1));
339
    } else {
340
      if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0){
3,185!
341
        tscError("0x%" PRIx64 " failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
×
342
                 output.dbId);
343
      }
344
      output.dbVgroup = NULL;
3,185✔
345
    }
346
  }
347

348
  taosMemoryFreeClear(output.dbVgroup);
9,438✔
349
  tFreeSUsedbRsp(&usedbRsp);
9,441✔
350

351
  char db[TSDB_DB_NAME_LEN] = {0};
9,437✔
352
  if(tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
9,437!
353
    tscError("0x%" PRIx64 " failed to get db name since %s", pRequest->requestId, tstrerror(code));
×
354
  }
355

356
  setConnectionDB(pRequest->pTscObj, db);
9,438✔
357

358
  taosMemoryFree(pMsg->pData);
9,441✔
359
  taosMemoryFree(pMsg->pEpSet);
9,441✔
360

361
  if (pRequest->body.queryFp != NULL) {
9,441!
362
    doRequestCallback(pRequest, pRequest->code);
9,441✔
363
  } else {
364
    if (tsem_post(&pRequest->body.rspSem) != 0){
×
365
      tscError("failed to post semaphore");
×
366
    }
367
  }
368
  return 0;
9,439✔
369
}
370

371
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
7,226✔
372
  if (pMsg == NULL) {
7,226!
373
    return TSDB_CODE_TSC_INVALID_INPUT;
×
374
  }
375
  if (param == NULL) {
7,226!
376
    taosMemoryFree(pMsg->pEpSet);
×
377
    taosMemoryFree(pMsg->pData);
×
378
    return TSDB_CODE_TSC_INVALID_INPUT;
×
379
  }
380

381
  SRequestObj* pRequest = param;
7,226✔
382

383
  if (code != TSDB_CODE_SUCCESS) {
7,226✔
384
    setErrno(pRequest, code);
75✔
385
  } else {
386
    SMCreateStbRsp createRsp = {0};
7,151✔
387
    SDecoder       coder = {0};
7,151✔
388
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
7,151✔
389
    if (pMsg->len > 0){
7,151✔
390
      code = tDecodeSMCreateStbRsp(&coder, &createRsp);  // pMsg->len == 0
7,086✔
391
      if (code != TSDB_CODE_SUCCESS) {
7,086!
392
        setErrno(pRequest, code);
×
393
      }
394
    }
395
    tDecoderClear(&coder);
7,151✔
396

397
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
7,151✔
398
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
7,151✔
399
  }
400

401
  taosMemoryFree(pMsg->pEpSet);
7,226✔
402
  taosMemoryFree(pMsg->pData);
7,226✔
403

404
  if (pRequest->body.queryFp != NULL) {
7,226✔
405
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
6,255✔
406

407
    if (code == TSDB_CODE_SUCCESS) {
6,255✔
408
      SCatalog* pCatalog = NULL;
6,246✔
409
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
6,246✔
410
      if (pRes->res != NULL) {
6,246✔
411
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
6,234✔
412
      }
413

414
      if (ret != TSDB_CODE_SUCCESS) {
6,246!
415
        code = ret;
×
416
      }
417
    }
418

419
    doRequestCallback(pRequest, code);
6,255✔
420
  } else {
421
    if (tsem_post(&pRequest->body.rspSem) != 0){
971!
422
      tscError("failed to post semaphore");
×
423
    }
424
  }
425
  return code;
7,226✔
426
}
427

428
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
3,455✔
429
  SRequestObj* pRequest = param;
3,455✔
430
  if (code != TSDB_CODE_SUCCESS) {
3,455✔
431
    setErrno(pRequest, code);
85✔
432
  } else {
433
    SDropDbRsp dropdbRsp = {0};
3,370✔
434
    if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0){
3,370!
435
      tscError("0x%" PRIx64 " deserialize SDropDbRsp failed", pRequest->requestId);
×
436
    }
437
    struct SCatalog* pCatalog = NULL;
3,370✔
438
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
3,370✔
439
    if (TSDB_CODE_SUCCESS == code) {
3,370!
440
      if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0){
3,370!
441
        tscError("0x%" PRIx64 " failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
×
442
      }
443
      STscObj* pTscObj = pRequest->pTscObj;
3,370✔
444

445
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
3,370✔
446
                               .requestId = pRequest->requestId,
3,370✔
447
                               .requestObjRefId = pRequest->self,
3,370✔
448
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
3,370✔
449
      char             dbFName[TSDB_DB_FNAME_LEN] = {0};
3,370✔
450
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
3,370✔
451
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
3,370!
452
        tscError("0x%" PRIx64 " failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
453
       }
454
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
3,370✔
455
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
3,370!
456
        tscError("0x%" PRIx64 " failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
457
      }
458
    }
459
  }
460

461
  taosMemoryFree(pMsg->pData);
3,455✔
462
  taosMemoryFree(pMsg->pEpSet);
3,455✔
463

464
  if (pRequest->body.queryFp != NULL) {
3,455!
465
    doRequestCallback(pRequest, code);
3,455✔
466
  } else {
467
    if (tsem_post(&pRequest->body.rspSem) != 0){
×
468
      tscError("failed to post semaphore");
×
469
    }
470
  }
471
  return code;
3,455✔
472
}
473

474
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
4,010✔
475
  SRequestObj* pRequest = param;
4,010✔
476
  if (code != TSDB_CODE_SUCCESS) {
4,010✔
477
    setErrno(pRequest, code);
1,596✔
478
  } else {
479
    SMAlterStbRsp alterRsp = {0};
2,414✔
480
    SDecoder      coder = {0};
2,414✔
481
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
2,414✔
482
    if (pMsg->len > 0){
2,414✔
483
      code = tDecodeSMAlterStbRsp(&coder, &alterRsp);  // pMsg->len == 0
2,385✔
484
      if (code != TSDB_CODE_SUCCESS) {
2,385!
485
        setErrno(pRequest, code);
×
486
      }
487
    }
488
    tDecoderClear(&coder);
2,414✔
489

490
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
2,414✔
491
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
2,414✔
492
  }
493

494
  taosMemoryFree(pMsg->pData);
4,010✔
495
  taosMemoryFree(pMsg->pEpSet);
4,010✔
496

497
  if (pRequest->body.queryFp != NULL) {
4,010!
498
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
4,010✔
499

500
    if (code == TSDB_CODE_SUCCESS) {
4,010✔
501
      SCatalog* pCatalog = NULL;
2,414✔
502
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
2,414✔
503
      if (pRes->res != NULL) {
2,414✔
504
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
2,385✔
505
      }
506

507
      if (ret != TSDB_CODE_SUCCESS) {
2,414!
508
        code = ret;
×
509
      }
510
    }
511

512
    doRequestCallback(pRequest, code);
4,010✔
513
  } else {
514
    if (tsem_post(&pRequest->body.rspSem) != 0){
×
515
      tscError("failed to post semaphore");
×
516
    }
517
  }
518
  return code;
4,010✔
519
}
520

521
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
11✔
522
  int32_t code = 0;
11✔
523
  int32_t line = 0;
11✔
524
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
11✔
525
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
11!
526
  pBlock->info.hasVarCol = true;
11✔
527

528
  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
11✔
529
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
11!
530
  SColumnInfoData infoData = {0};
11✔
531
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
11✔
532
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
11✔
533
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
22!
534

535
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
11✔
536
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
11✔
537
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
22!
538

539
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
11✔
540
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
11✔
541
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
22!
542

543
  int32_t numOfCfg = taosArrayGetSize(pVars);
11✔
544
  code = blockDataEnsureCapacity(pBlock, numOfCfg);
11✔
545
  TSDB_CHECK_CODE(code, line, END);
11!
546

547
  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
110✔
548
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
99✔
549
    TSDB_CHECK_NULL(pInfo, code, line, END, terrno);
99!
550

551
    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
99✔
552
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
99✔
553
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
99✔
554
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
99!
555
    code = colDataSetVal(pColInfo, i, name, false);
99✔
556
    TSDB_CHECK_CODE(code, line, END);
99!
557

558
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
99✔
559
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
99✔
560
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
99✔
561
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
99!
562
    code = colDataSetVal(pColInfo, i, value, false);
99✔
563
    TSDB_CHECK_CODE(code, line, END);
99!
564

565
    char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
99✔
566
    STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
99✔
567
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
99✔
568
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
99!
569
    code = colDataSetVal(pColInfo, i, scope, false);
99✔
570
    TSDB_CHECK_CODE(code, line, END);
99!
571
  }
572

573
  pBlock->info.rows = numOfCfg;
11✔
574

575
  *block = pBlock;
11✔
576
  return code;
11✔
577

578
END:
×
579
  taosArrayDestroy(pBlock->pDataBlock);
×
580
  taosMemoryFree(pBlock);
×
581
  return code;
×
582
}
583

584
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
11✔
585
  SSDataBlock* pBlock = NULL;
11✔
586
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
11✔
587
  if (code) {
11!
588
    return code;
×
589
  }
590

591
  size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
11✔
592
  *pRsp = taosMemoryCalloc(1, rspSize);
11✔
593
  if (NULL == *pRsp) {
11!
594
    code = terrno;
×
595
    goto  _exit;
×
596
  }
597

598
  (*pRsp)->useconds = 0;
11✔
599
  (*pRsp)->completed = 1;
11✔
600
  (*pRsp)->precision = 0;
11✔
601
  (*pRsp)->compressed = 0;
11✔
602

603
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
11✔
604
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
11✔
605

606
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, SHOW_VARIABLES_RESULT_COLS);
11✔
607
  if(len < 0) {
11!
608
    uError("buildShowVariablesRsp error, len:%d", len);
×
609
    code = terrno;
×
610
    goto _exit;
×
611
  }
612
  blockDataDestroy(pBlock);
11✔
613

614
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
11✔
615

616
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
11✔
617
  (*pRsp)->payloadLen = htonl(payloadLen);
11✔
618
  (*pRsp)->compLen = htonl(payloadLen);
11✔
619

620
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
11!
621
    uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
622
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
623
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
624
    goto _exit;
×
625
  }
626

627
  return TSDB_CODE_SUCCESS;
11✔
628
_exit:
×
629
  if(*pRsp)  {
×
630
    taosMemoryFree(*pRsp);
×
631
    *pRsp = NULL;
×
632
  }
633
  if(pBlock) {
×
634
    blockDataDestroy(pBlock);
×
635
    pBlock = NULL;
×
636
  }
637
  return code;
×
638
}
639

640
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
11✔
641
  SRequestObj* pRequest = param;
11✔
642
  if (code != TSDB_CODE_SUCCESS) {
11!
643
    setErrno(pRequest, code);
×
644
  } else {
645
    SShowVariablesRsp  rsp = {0};
11✔
646
    SRetrieveTableRsp* pRes = NULL;
11✔
647
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
11✔
648
    if (TSDB_CODE_SUCCESS == code) {
11!
649
      code = buildShowVariablesRsp(rsp.variables, &pRes);
11✔
650
    }
651
    if (TSDB_CODE_SUCCESS == code) {
11!
652
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false);
11✔
653
    }
654

655
    if (code != 0) {
11!
656
      taosMemoryFree(pRes);
×
657
    }
658
    tFreeSShowVariablesRsp(&rsp);
11✔
659
  }
660

661
  taosMemoryFree(pMsg->pData);
11✔
662
  taosMemoryFree(pMsg->pEpSet);
11✔
663

664
  if (pRequest->body.queryFp != NULL) {
11!
665
    doRequestCallback(pRequest, code);
11✔
666
  } else {
667
    if (tsem_post(&pRequest->body.rspSem) != 0){
×
668
      tscError("failed to post semaphore");
×
669
    }
670
  }
671
  return code;
11✔
672
}
673

674
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
10✔
675
  int32_t code = 0;
10✔
676
  int32_t line = 0;
10✔
677
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
10✔
678
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
10!
679
  pBlock->info.hasVarCol = true;
10✔
680

681
  pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
10✔
682
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
10!
683
  SColumnInfoData infoData = {0};
10✔
684
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
10✔
685
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
10✔
686
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
20!
687

688
  infoData.info.type = TSDB_DATA_TYPE_INT;
10✔
689
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
10✔
690
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
20!
691

692
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
10✔
693
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
10✔
694
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
20!
695

696
  code = blockDataEnsureCapacity(pBlock, 1);
10✔
697
  TSDB_CHECK_CODE(code, line, END);
10!
698

699
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
10✔
700
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
10!
701
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
10✔
702
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
10!
703
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
10✔
704
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
10!
705

706
  char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
10✔
707
  char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
10✔
708
  if (pRsp->bAccepted) {
10!
709
    STR_TO_VARSTR(result, "accepted");
10✔
710
    code = colDataSetVal(pResultCol, 0, result, false);
10✔
711
    TSDB_CHECK_CODE(code, line, END);
10!
712
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
10✔
713
    TSDB_CHECK_CODE(code, line, END);
10!
714
    STR_TO_VARSTR(reason, "success");
10✔
715
    code = colDataSetVal(pReasonCol, 0, reason, false);
10✔
716
    TSDB_CHECK_CODE(code, line, END);
10!
717
  } else {
718
    STR_TO_VARSTR(result, "rejected");
×
719
    code = colDataSetVal(pResultCol, 0, result, false);
×
720
    TSDB_CHECK_CODE(code, line, END);
×
721
    colDataSetNULL(pIdCol, 0);
722
    STR_TO_VARSTR(reason, "compaction is ongoing");
×
723
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
724
    TSDB_CHECK_CODE(code, line, END);
×
725
  }
726
  pBlock->info.rows = 1;
10✔
727

728
  *block = pBlock;
10✔
729

730
  return TSDB_CODE_SUCCESS;
10✔
731
END:
×
732
  taosMemoryFree(pBlock);
×
733
  taosArrayDestroy(pBlock->pDataBlock);
×
734
  return code;
×
735
}
736

737
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
10✔
738
  SSDataBlock* pBlock = NULL;
10✔
739
  int32_t      code = buildCompactDbBlock(pCompactDb, &pBlock);
10✔
740
  if (code) {
10!
741
    return code;
×
742
  }
743

744
  size_t rspSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
10✔
745
  *pRsp = taosMemoryCalloc(1, rspSize);
10✔
746
  if (NULL == *pRsp) {
10!
747
    code = terrno;
×
748
    goto _exit;
×
749
  }
750

751
  (*pRsp)->useconds = 0;
10✔
752
  (*pRsp)->completed = 1;
10✔
753
  (*pRsp)->precision = 0;
10✔
754
  (*pRsp)->compressed = 0;
10✔
755
  (*pRsp)->compLen = 0;
10✔
756
  (*pRsp)->payloadLen = 0;
10✔
757
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
10✔
758
  (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
10✔
759

760
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, COMPACT_DB_RESULT_COLS);
10✔
761
  if(len < 0) {
10!
762
    uError("buildRetriveTableRspForCompactDb error, len:%d", len);
×
763
    code = terrno;
×
764
    goto _exit;
×
765
  }
766
  blockDataDestroy(pBlock);
10✔
767

768
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
10✔
769

770
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
10✔
771
  (*pRsp)->payloadLen = htonl(payloadLen);
10✔
772
  (*pRsp)->compLen = htonl(payloadLen);
10✔
773

774
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
10!
775
    uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
776
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
777
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
778
    goto _exit;
×
779
  }
780

781
  return TSDB_CODE_SUCCESS;
10✔
782
_exit:
×
783
  if(*pRsp)  {
×
784
    taosMemoryFree(*pRsp);
×
785
    *pRsp = NULL;
×
786
  }
787
  if(pBlock) {
×
788
    blockDataDestroy(pBlock);
×
789
    pBlock = NULL;
×
790
  }
791
  return code;
×
792
}
793

794

795
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
10✔
796
  SRequestObj* pRequest = param;
10✔
797
  if (code != TSDB_CODE_SUCCESS) {
10!
798
    setErrno(pRequest, code);
×
799
  } else {
800
    SCompactDbRsp  rsp = {0};
10✔
801
    SRetrieveTableRsp* pRes = NULL;
10✔
802
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
10✔
803
    if (TSDB_CODE_SUCCESS == code) {
10!
804
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
10✔
805
    }
806
    if (TSDB_CODE_SUCCESS == code) {
10!
807
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false);
10✔
808
    }
809

810
    if (code != 0) {
10!
811
      taosMemoryFree(pRes);
×
812
    }
813
  }
814

815
  taosMemoryFree(pMsg->pData);
10✔
816
  taosMemoryFree(pMsg->pEpSet);
10✔
817

818
  if (pRequest->body.queryFp != NULL) {
10!
819
    pRequest->body.queryFp(((SSyncQueryParam *)pRequest->body.interParam)->userParam, pRequest, code);
10✔
820
  } else {
821
    if (tsem_post(&pRequest->body.rspSem) != 0){
×
822
      tscError("failed to post semaphore");
×
823
    }
824
  }
825
  return code;  
10✔
826
}
827

828
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
50,567✔
829
  switch (msgType) {
50,567✔
830
    case TDMT_MND_CONNECT:
12,526✔
831
      return processConnectRsp;
12,526✔
832
    case TDMT_MND_CREATE_DB:
4,808✔
833
      return processCreateDbRsp;
4,808✔
834
    case TDMT_MND_USE_DB:
9,448✔
835
      return processUseDbRsp;
9,448✔
836
    case TDMT_MND_CREATE_STB:
7,226✔
837
      return processCreateSTableRsp;
7,226✔
838
    case TDMT_MND_DROP_DB:
3,455✔
839
      return processDropDbRsp;
3,455✔
840
    case TDMT_MND_ALTER_STB:
4,010✔
841
      return processAlterStbRsp;
4,010✔
842
    case TDMT_MND_SHOW_VARIABLES:
11✔
843
      return processShowVariablesRsp;
11✔
844
    case TDMT_MND_COMPACT_DB:
10✔
845
      return processCompactDbRsp;  
10✔
846
    default:
9,073✔
847
      return genericRspCallback;
9,073✔
848
  }
849
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc