• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3653

14 Mar 2025 08:10AM UTC coverage: 22.565% (-41.0%) from 63.596%
#3653

push

travis-ci

web-flow
feat(keep): support keep on super table level. (#30097)

* Feat: support use keep while create super table.

* Test(keep): add test for create super table with keep option.

* Feat(keep): Add tmsg for create keep.

* Feat(keep): support alter table option keep.

* Fix(keep): Add baisc test for alter table option.

* Fix(keep): memory leek.

* Feat(keep): add keep to metaEntry&metaCache and fix earliestTs with stn keep.

* Test(keep): add some cases for select with stb keep.

* Fix: fix ci core while alter stb.

* Feat(keep): delete expired data in super table level.

* Feat: remove get stb keep while query.

* Fix : build error.

* Revert "Fix : build error."

This reverts commit 0ed66e4e8.

* Revert "Feat(keep): delete expired data in super table level."

This reverts commit 36330f6b4.

* Fix : build errors.

* Feat : support restart taosd.

* Fix : alter table comment problems.

* Test : add tests for super table keep.

* Fix: change sdb stb reserve size.

* Test: add more tests.

* Feat: Disable normal tables and sub tables from setting the keep parameter

* Fix: add more checks to avoid unknown address.

* Docs: Add docs for stable keep.

* Fix: some review changes.

* Fix: review errors.

49248 of 302527 branches covered (16.28%)

Branch coverage included in aggregate %.

53 of 99 new or added lines in 12 files covered. (53.54%)

155872 existing lines in 443 files now uncovered.

87359 of 302857 relevant lines covered (28.84%)

570004.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.62
/source/client/src/clientMsgHandler.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "catalog.h"
17
#include "clientInt.h"
18
#include "clientLog.h"
19
#include "clientMonitor.h"
20
#include "cmdnodes.h"
21
#include "command.h"
22
#include "os.h"
23
#include "query.h"
24
#include "systable.h"
25
#include "tdatablock.h"
26
#include "tdef.h"
27
#include "tglobal.h"
28
#include "tname.h"
29
#include "tversion.h"
30

31
extern SClientHbMgr clientHbMgr;
32

33
static void setErrno(SRequestObj* pRequest, int32_t code) {
16✔
34
  pRequest->code = code;
16✔
35
  terrno = code;
16✔
36
}
16✔
37

38
int32_t genericRspCallback(void* param, SDataBuf* pMsg, int32_t code) {
2✔
39
  SRequestObj* pRequest = param;
2✔
40
  setErrno(pRequest, code);
2✔
41

42
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
2!
UNCOV
43
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
×
44
      tscError("failed to remove meta data for table");
×
45
    }
46
  }
47

48
  taosMemoryFree(pMsg->pEpSet);
2!
49
  taosMemoryFree(pMsg->pData);
2!
50
  if (pRequest->body.queryFp != NULL) {
2!
51
    doRequestCallback(pRequest, code);
2✔
52
  } else {
UNCOV
53
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
54
      tscError("failed to post semaphore");
×
55
    }
56
  }
57
  return code;
2✔
58
}
59

60
int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
36✔
61
  SRequestObj* pRequest = acquireRequest(*(int64_t*)param);
36✔
62
  if (NULL == pRequest) {
36!
63
    goto EXIT;
×
64
  }
65

66
  if (code != TSDB_CODE_SUCCESS) {
36!
UNCOV
67
    goto End;
×
68
  }
69

70
  STscObj* pTscObj = pRequest->pTscObj;
36✔
71

72
  if (NULL == pTscObj->pAppInfo) {
36!
73
    code = TSDB_CODE_TSC_DISCONNECTED;
×
74
    goto End;
×
75
  }
76

77
  SConnectRsp connectRsp = {0};
36✔
78
  if (tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp) != 0) {
36!
79
    code = TSDB_CODE_TSC_INVALID_VERSION;
×
80
    goto End;
×
81
  }
82

83
  if ((code = taosCheckVersionCompatibleFromStr(td_version, connectRsp.sVer, 3)) != 0) {
36!
84
    tscError("version not compatible. client version:%s, server version:%s", td_version, connectRsp.sVer);
×
85
    goto End;
×
86
  }
87

88
  int32_t now = taosGetTimestampSec();
36✔
89
  int32_t delta = abs(now - connectRsp.svrTimestamp);
36✔
90
  if (delta > timestampDeltaLimit) {
36!
91
    code = TSDB_CODE_TIME_UNSYNCED;
×
92
    tscError("time diff:%ds is too big", delta);
×
93
    goto End;
×
94
  }
95

96
  if (connectRsp.epSet.numOfEps == 0) {
36!
97
    code = TSDB_CODE_APP_ERROR;
×
98
    goto End;
×
99
  }
100

101
  int updateEpSet = 1;
36✔
102
  if (connectRsp.dnodeNum == 1) {
36✔
103
    SEpSet srcEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
7✔
104
    SEpSet dstEpSet = connectRsp.epSet;
7✔
105
    if (srcEpSet.numOfEps == 1) {
7!
106
      if (rpcSetDefaultAddr(pTscObj->pAppInfo->pTransporter, srcEpSet.eps[srcEpSet.inUse].fqdn,
7!
107
                            dstEpSet.eps[dstEpSet.inUse].fqdn) != 0) {
7✔
108
        tscError("failed to set default addr for rpc");
×
109
      }
110
      updateEpSet = 0;
7✔
111
    }
112
  }
113
  if (updateEpSet == 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
36✔
114
    SEpSet corEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
6✔
115

116
    SEpSet* pOrig = &corEpSet;
6✔
117
    SEp*    pOrigEp = &pOrig->eps[pOrig->inUse];
6✔
118
    SEp*    pNewEp = &connectRsp.epSet.eps[connectRsp.epSet.inUse];
6✔
119
    tscDebug("mnode epset updated from %d/%d=>%s:%d to %d/%d=>%s:%d in connRsp", pOrig->inUse, pOrig->numOfEps,
6!
120
             pOrigEp->fqdn, pOrigEp->port, connectRsp.epSet.inUse, connectRsp.epSet.numOfEps, pNewEp->fqdn,
121
             pNewEp->port);
122
    updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
6✔
123
  }
124

125
  for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
72✔
126
    tscDebug("QID:0x%" PRIx64 ", epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
36!
127
             connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
128
  }
129

130
  pTscObj->sysInfo = connectRsp.sysInfo;
36✔
131
  pTscObj->connId = connectRsp.connId;
36✔
132
  pTscObj->acctId = connectRsp.acctId;
36✔
133
  tstrncpy(pTscObj->sVer, connectRsp.sVer, tListLen(pTscObj->sVer));
36✔
134
  tstrncpy(pTscObj->sDetailVer, connectRsp.sDetailVer, tListLen(pTscObj->sDetailVer));
36✔
135

136
  // update the appInstInfo
137
  pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
36✔
138
  pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
36✔
139
  pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
36✔
140
  tscDebug("monitor paras from connect rsp, clusterId:0x%" PRIx64 ", threshold:%d scope:%d",
36!
141
           connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
142
  lastClusterId = connectRsp.clusterId;
36✔
143

144
  pTscObj->connType = connectRsp.connType;
36✔
145
  pTscObj->passInfo.ver = connectRsp.passVer;
36✔
146
  pTscObj->authVer = connectRsp.authVer;
36✔
147
  pTscObj->whiteListInfo.ver = connectRsp.whiteListVer;
36✔
148

149
  if (taosHashGet(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES) == NULL) {
36✔
150
    if (taosHashPut(appInfo.pInstMapByClusterId, &connectRsp.clusterId, LONG_BYTES, &pTscObj->pAppInfo,
3!
151
                    POINTER_BYTES) != 0) {
152
      tscError("failed to put appInfo into appInfo.pInstMapByClusterId");
×
153
    } else {
154
#ifdef USE_MONITOR
155
      MonitorSlowLogData data = {0};
3✔
156
      data.clusterId = pTscObj->pAppInfo->clusterId;
3✔
157
      data.type = SLOW_LOG_READ_BEGINNIG;
3✔
158
      (void)monitorPutData2MonitorQueue(data);  // ignore
3✔
159
      monitorClientSlowQueryInit(connectRsp.clusterId);
3✔
160
      monitorClientSQLReqInit(connectRsp.clusterId);
3✔
161
#endif
162
    }
163
  }
164

165
  (void)taosThreadMutexLock(&clientHbMgr.lock);
36✔
166
  SAppHbMgr* pAppHbMgr = taosArrayGetP(clientHbMgr.appHbMgrs, pTscObj->appHbMgrIdx);
36✔
167
  if (pAppHbMgr) {
36!
168
    if (hbRegisterConn(pAppHbMgr, pTscObj->id, connectRsp.clusterId, connectRsp.connType) != 0) {
36!
UNCOV
169
      tscError("QID:0x%" PRIx64 ", failed to register conn to hbMgr", pRequest->requestId);
×
170
    }
171
  } else {
172
    (void)taosThreadMutexUnlock(&clientHbMgr.lock);
×
UNCOV
173
    code = TSDB_CODE_TSC_DISCONNECTED;
×
UNCOV
174
    goto End;
×
175
  }
176
  (void)taosThreadMutexUnlock(&clientHbMgr.lock);
36✔
177

178
  tscDebug("QID:0x%" PRIx64 ", clusterId:0x%" PRIx64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
36!
179
           pTscObj->pAppInfo->numOfConns);
180

181
End:
36✔
182
  if (code != 0) {
36!
UNCOV
183
    setErrno(pRequest, code);
×
184
  }
185
  if (tsem_post(&pRequest->body.rspSem) != 0) {
36!
UNCOV
186
    tscError("failed to post semaphore");
×
187
  }
188

189
  if (pRequest) {
36!
190
    (void)releaseRequest(pRequest->self);
36✔
191
  }
192

UNCOV
193
EXIT:
×
194
  taosMemoryFree(param);
36!
195
  taosMemoryFree(pMsg->pEpSet);
36!
196
  taosMemoryFree(pMsg->pData);
36!
197
  return code;
36✔
198
}
199

200
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pRequest) {
44✔
201
  SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
44!
202
  if (pMsgSendInfo == NULL) return pMsgSendInfo;
44!
203
  pMsgSendInfo->requestObjRefId = pRequest->self;
44✔
204
  pMsgSendInfo->requestId = pRequest->requestId;
44✔
205
  pMsgSendInfo->param = pRequest;
44✔
206
  pMsgSendInfo->msgType = pRequest->type;
44✔
207
  pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
44✔
208

209
  pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
44✔
210
  pMsgSendInfo->fp = getMsgRspHandle(pRequest->type);
44✔
211
  return pMsgSendInfo;
44✔
212
}
213

214
int32_t processCreateDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
15✔
215
  // todo rsp with the vnode id list
216
  SRequestObj* pRequest = param;
15✔
217
  taosMemoryFree(pMsg->pData);
15!
218
  taosMemoryFree(pMsg->pEpSet);
15!
219
  if (code != TSDB_CODE_SUCCESS) {
15✔
220
    setErrno(pRequest, code);
8✔
221
  } else {
222
    struct SCatalog* pCatalog = NULL;
7✔
223
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
7✔
224
    if (TSDB_CODE_SUCCESS == code) {
7!
225
      STscObj* pTscObj = pRequest->pTscObj;
7✔
226

227
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
7✔
228
                               .requestId = pRequest->requestId,
7✔
229
                               .requestObjRefId = pRequest->self,
7✔
230
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
7✔
231
      char             dbFName[TSDB_DB_FNAME_LEN];
232
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
7✔
233
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
7!
UNCOV
234
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
235
      }
236
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
7✔
237
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
7!
UNCOV
238
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info", pRequest->requestId);
×
239
      }
240
    }
241
  }
242

243
  if (pRequest->body.queryFp) {
15!
244
    doRequestCallback(pRequest, code);
15✔
245
  } else {
UNCOV
246
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
247
      tscError("failed to post semaphore");
×
248
    }
249
  }
250
  return code;
15✔
251
}
252

253
int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
6✔
254
  SRequestObj* pRequest = param;
6✔
255
  if (TSDB_CODE_MND_DB_NOT_EXIST == code || TSDB_CODE_MND_DB_IN_CREATING == code ||
6!
256
      TSDB_CODE_MND_DB_IN_DROPPING == code) {
257
    SUseDbRsp usedbRsp = {0};
6✔
258
    if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
6!
259
      tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
6!
260
    }
261
    struct SCatalog* pCatalog = NULL;
6✔
262

263
    if (usedbRsp.vgVersion >= 0) {  // cached in local
6!
264
      int64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
6✔
265
      int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
6✔
266
      if (code1 != TSDB_CODE_SUCCESS) {
6!
UNCOV
267
        tscWarn("QID:0x%" PRIx64 ", catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->requestId, clusterId,
×
268
                tstrerror(code1));
269
      } else {
270
        if (catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid) != 0) {
6!
UNCOV
271
          tscError("QID:0x%" PRIx64 ", catalogRemoveDB failed, db:%s, uid:%" PRId64, pRequest->requestId, usedbRsp.db,
×
272
                   usedbRsp.uid);
273
        }
274
      }
275
    }
276
    tFreeSUsedbRsp(&usedbRsp);
6✔
277
  }
278

279
  if (code != TSDB_CODE_SUCCESS) {
6!
280
    taosMemoryFree(pMsg->pData);
6!
281
    taosMemoryFree(pMsg->pEpSet);
6!
282
    setErrno(pRequest, code);
6✔
283

284
    if (pRequest->body.queryFp != NULL) {
6!
285
      doRequestCallback(pRequest, pRequest->code);
6✔
286

287
    } else {
UNCOV
288
      if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
289
        tscError("failed to post semaphore");
×
290
      }
291
    }
292

293
    return code;
6✔
294
  }
295

296
  SUseDbRsp usedbRsp = {0};
×
UNCOV
297
  if (tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp) != 0) {
×
UNCOV
298
    tscError("QID:0x%" PRIx64 ", deserialize SUseDbRsp failed", pRequest->requestId);
×
299
  }
300

301
  if (strlen(usedbRsp.db) == 0) {
×
UNCOV
302
    taosMemoryFree(pMsg->pData);
×
303
    taosMemoryFree(pMsg->pEpSet);
×
304

UNCOV
305
    if (usedbRsp.errCode != 0) {
×
306
      return usedbRsp.errCode;
×
307
    } else {
UNCOV
308
      return TSDB_CODE_APP_ERROR;
×
309
    }
310
  }
311

UNCOV
312
  tscTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
×
UNCOV
313
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
×
314
    SVgroupInfo* pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
×
UNCOV
315
    if (pInfo == NULL) {
×
UNCOV
316
      continue;
×
317
    }
UNCOV
318
    tscTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
×
UNCOV
319
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
×
UNCOV
320
      tscTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
×
321
    }
322
  }
323

324
  SName name = {0};
×
UNCOV
325
  if (tNameFromString(&name, usedbRsp.db, T_NAME_ACCT | T_NAME_DB) != TSDB_CODE_SUCCESS) {
×
UNCOV
326
    tscError("QID:0x%" PRIx64 ", failed to parse db name:%s", pRequest->requestId, usedbRsp.db);
×
327
  }
328

UNCOV
329
  SUseDbOutput output = {0};
×
330
  code = queryBuildUseDbOutput(&output, &usedbRsp);
×
331
  if (code != 0) {
×
UNCOV
332
    terrno = code;
×
333
    if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
×
334

UNCOV
335
    tscError("QID:0x%" PRIx64 ", failed to build use db output since %s", pRequest->requestId, terrstr());
×
UNCOV
336
  } else if (output.dbVgroup && output.dbVgroup->vgHash) {
×
UNCOV
337
    struct SCatalog* pCatalog = NULL;
×
338

339
    int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
UNCOV
340
    if (code1 != TSDB_CODE_SUCCESS) {
×
UNCOV
341
      tscWarn("catalogGetHandle failed, clusterId:0x%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
×
342
              tstrerror(code1));
343
    } else {
UNCOV
344
      if (catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup) != 0) {
×
UNCOV
345
        tscError("QID:0x%" PRIx64 ", failed to update db vg info, db:%s, dbId:%" PRId64, pRequest->requestId, output.db,
×
346
                 output.dbId);
347
      }
UNCOV
348
      output.dbVgroup = NULL;
×
349
    }
350
  }
351

UNCOV
352
  taosMemoryFreeClear(output.dbVgroup);
×
UNCOV
353
  tFreeSUsedbRsp(&usedbRsp);
×
354

355
  char db[TSDB_DB_NAME_LEN] = {0};
×
UNCOV
356
  if (tNameGetDbName(&name, db) != TSDB_CODE_SUCCESS) {
×
UNCOV
357
    tscError("QID:0x%" PRIx64 ", failed to get db name since %s", pRequest->requestId, tstrerror(code));
×
358
  }
359

UNCOV
360
  setConnectionDB(pRequest->pTscObj, db);
×
361

UNCOV
362
  taosMemoryFree(pMsg->pData);
×
UNCOV
363
  taosMemoryFree(pMsg->pEpSet);
×
364

UNCOV
365
  if (pRequest->body.queryFp != NULL) {
×
366
    doRequestCallback(pRequest, pRequest->code);
×
367
  } else {
UNCOV
368
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
369
      tscError("failed to post semaphore");
×
370
    }
371
  }
UNCOV
372
  return 0;
×
373
}
374

375
int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) {
3✔
376
  if (pMsg == NULL) {
3!
UNCOV
377
    tscError("processCreateSTableRsp: invalid input param, pMsg is NULL");
×
UNCOV
378
    return TSDB_CODE_TSC_INVALID_INPUT;
×
379
  }
380
  if (param == NULL) {
3!
381
    taosMemoryFree(pMsg->pEpSet);
×
382
    taosMemoryFree(pMsg->pData);
×
UNCOV
383
    tscError("processCreateSTableRsp: invalid input param, param is NULL");
×
UNCOV
384
    return TSDB_CODE_TSC_INVALID_INPUT;
×
385
  }
386

387
  SRequestObj* pRequest = param;
3✔
388

389
  if (code != TSDB_CODE_SUCCESS) {
3!
UNCOV
390
    setErrno(pRequest, code);
×
391
  } else {
392
    SMCreateStbRsp createRsp = {0};
3✔
393
    SDecoder       coder = {0};
3✔
394
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
3✔
395
    if (pMsg->len > 0) {
3!
396
      code = tDecodeSMCreateStbRsp(&coder, &createRsp);  // pMsg->len == 0
3✔
397
      if (code != TSDB_CODE_SUCCESS) {
3!
UNCOV
398
        setErrno(pRequest, code);
×
399
      }
400
    }
401
    tDecoderClear(&coder);
3✔
402

403
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_CREATE_STB;
3✔
404
    pRequest->body.resInfo.execRes.res = createRsp.pMeta;
3✔
405
  }
406

407
  taosMemoryFree(pMsg->pEpSet);
3!
408
  taosMemoryFree(pMsg->pData);
3!
409

410
  if (pRequest->body.queryFp != NULL) {
3!
411
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
3✔
412

413
    if (code == TSDB_CODE_SUCCESS) {
3!
414
      SCatalog* pCatalog = NULL;
3✔
415
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
3✔
416
      if (pRes->res != NULL) {
3!
417
        ret = handleCreateTbExecRes(pRes->res, pCatalog);
3✔
418
      }
419

420
      if (ret != TSDB_CODE_SUCCESS) {
3!
UNCOV
421
        code = ret;
×
422
      }
423
    }
424

425
    doRequestCallback(pRequest, code);
3✔
426
  } else {
UNCOV
427
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
428
      tscError("failed to post semaphore");
×
429
    }
430
  }
431
  return code;
3✔
432
}
433

434
int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
18✔
435
  SRequestObj* pRequest = param;
18✔
436
  if (code != TSDB_CODE_SUCCESS) {
18!
UNCOV
437
    setErrno(pRequest, code);
×
438
  } else {
439
    SDropDbRsp dropdbRsp = {0};
18✔
440
    if (tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp) != 0) {
18!
UNCOV
441
      tscError("QID:0x%" PRIx64 ", deserialize SDropDbRsp failed", pRequest->requestId);
×
442
    }
443
    struct SCatalog* pCatalog = NULL;
18✔
444
    code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
18✔
445
    if (TSDB_CODE_SUCCESS == code) {
18!
446
      if (catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid) != 0) {
18!
UNCOV
447
        tscError("QID:0x%" PRIx64 ", failed to remove db:%s", pRequest->requestId, dropdbRsp.db);
×
448
      }
449
      STscObj* pTscObj = pRequest->pTscObj;
18✔
450

451
      SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
18✔
452
                               .requestId = pRequest->requestId,
18✔
453
                               .requestObjRefId = pRequest->self,
18✔
454
                               .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
18✔
455
      char             dbFName[TSDB_DB_FNAME_LEN] = {0};
18✔
456
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_INFORMATION_SCHEMA_DB);
18✔
457
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != TSDB_CODE_SUCCESS) {
18!
UNCOV
458
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
459
      }
460
      (void)snprintf(dbFName, sizeof(dbFName) - 1, "%d.%s", pTscObj->acctId, TSDB_PERFORMANCE_SCHEMA_DB);
18✔
461
      if (catalogRefreshDBVgInfo(pCatalog, &conn, dbFName) != 0) {
18!
UNCOV
462
        tscError("QID:0x%" PRIx64 ", failed to refresh db vg info, db:%s", pRequest->requestId, dbFName);
×
463
      }
464
    }
465
  }
466

467
  taosMemoryFree(pMsg->pData);
18!
468
  taosMemoryFree(pMsg->pEpSet);
18!
469

470
  if (pRequest->body.queryFp != NULL) {
18!
471
    doRequestCallback(pRequest, code);
18✔
472
  } else {
UNCOV
473
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
474
      tscError("failed to post semaphore");
×
475
    }
476
  }
477
  return code;
18✔
478
}
479

UNCOV
480
int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
UNCOV
481
  SRequestObj* pRequest = param;
×
UNCOV
482
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
483
    setErrno(pRequest, code);
×
484
  } else {
UNCOV
485
    SMAlterStbRsp alterRsp = {0};
×
UNCOV
486
    SDecoder      coder = {0};
×
UNCOV
487
    tDecoderInit(&coder, pMsg->pData, pMsg->len);
×
UNCOV
488
    if (pMsg->len > 0) {
×
489
      code = tDecodeSMAlterStbRsp(&coder, &alterRsp);  // pMsg->len == 0
×
UNCOV
490
      if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
491
        setErrno(pRequest, code);
×
492
      }
493
    }
UNCOV
494
    tDecoderClear(&coder);
×
495

UNCOV
496
    pRequest->body.resInfo.execRes.msgType = TDMT_MND_ALTER_STB;
×
UNCOV
497
    pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
×
498
  }
499

UNCOV
500
  taosMemoryFree(pMsg->pData);
×
UNCOV
501
  taosMemoryFree(pMsg->pEpSet);
×
502

UNCOV
503
  if (pRequest->body.queryFp != NULL) {
×
UNCOV
504
    SExecResult* pRes = &pRequest->body.resInfo.execRes;
×
505

UNCOV
506
    if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
507
      SCatalog* pCatalog = NULL;
×
UNCOV
508
      int32_t   ret = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
×
UNCOV
509
      if (pRes->res != NULL) {
×
UNCOV
510
        ret = handleAlterTbExecRes(pRes->res, pCatalog);
×
511
      }
512

UNCOV
513
      if (ret != TSDB_CODE_SUCCESS) {
×
UNCOV
514
        code = ret;
×
515
      }
516
    }
517

518
    doRequestCallback(pRequest, code);
×
519
  } else {
UNCOV
520
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
521
      tscError("failed to post semaphore");
×
522
    }
523
  }
UNCOV
524
  return code;
×
525
}
526

UNCOV
527
static int32_t buildShowVariablesBlock(SArray* pVars, SSDataBlock** block) {
×
UNCOV
528
  int32_t      code = 0;
×
UNCOV
529
  int32_t      line = 0;
×
UNCOV
530
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
UNCOV
531
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
×
UNCOV
532
  pBlock->info.hasVarCol = true;
×
533

UNCOV
534
  pBlock->pDataBlock = taosArrayInit(SHOW_VARIABLES_RESULT_COLS, sizeof(SColumnInfoData));
×
UNCOV
535
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
×
UNCOV
536
  SColumnInfoData infoData = {0};
×
UNCOV
537
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
538
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD1_LEN;
×
UNCOV
539
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
×
540

UNCOV
541
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
542
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD2_LEN;
×
UNCOV
543
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
×
544

UNCOV
545
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
546
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD3_LEN;
×
UNCOV
547
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
×
548

UNCOV
549
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
550
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD4_LEN;
×
UNCOV
551
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
×
552

UNCOV
553
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
554
  infoData.info.bytes = SHOW_VARIABLES_RESULT_FIELD5_LEN;
×
UNCOV
555
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
×
556

UNCOV
557
  int32_t numOfCfg = taosArrayGetSize(pVars);
×
UNCOV
558
  code = blockDataEnsureCapacity(pBlock, numOfCfg);
×
UNCOV
559
  TSDB_CHECK_CODE(code, line, END);
×
560

UNCOV
561
  for (int32_t i = 0, c = 0; i < numOfCfg; ++i, c = 0) {
×
UNCOV
562
    SVariablesInfo* pInfo = taosArrayGet(pVars, i);
×
UNCOV
563
    TSDB_CHECK_NULL(pInfo, code, line, END, terrno);
×
564

UNCOV
565
    char name[TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
566
    STR_WITH_MAXSIZE_TO_VARSTR(name, pInfo->name, TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE);
×
UNCOV
567
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
×
UNCOV
568
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
×
UNCOV
569
    code = colDataSetVal(pColInfo, i, name, false);
×
UNCOV
570
    TSDB_CHECK_CODE(code, line, END);
×
571

UNCOV
572
    char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
573
    STR_WITH_MAXSIZE_TO_VARSTR(value, pInfo->value, TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE);
×
UNCOV
574
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
×
UNCOV
575
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
×
UNCOV
576
    code = colDataSetVal(pColInfo, i, value, false);
×
UNCOV
577
    TSDB_CHECK_CODE(code, line, END);
×
578

UNCOV
579
    char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
580
    STR_WITH_MAXSIZE_TO_VARSTR(scope, pInfo->scope, TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE);
×
UNCOV
581
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
×
UNCOV
582
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
×
UNCOV
583
    code = colDataSetVal(pColInfo, i, scope, false);
×
UNCOV
584
    TSDB_CHECK_CODE(code, line, END);
×
585

UNCOV
586
    char category[TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
587
    STR_WITH_MAXSIZE_TO_VARSTR(category, pInfo->category, TSDB_CONFIG_CATEGORY_LEN + VARSTR_HEADER_SIZE);
×
UNCOV
588
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
×
UNCOV
589
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
×
UNCOV
590
    code = colDataSetVal(pColInfo, i, category, false);
×
UNCOV
591
    TSDB_CHECK_CODE(code, line, END);
×
592

UNCOV
593
    char info[TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE] = {0};
×
UNCOV
594
    STR_WITH_MAXSIZE_TO_VARSTR(info, pInfo->info, TSDB_CONFIG_INFO_LEN + VARSTR_HEADER_SIZE);
×
UNCOV
595
    pColInfo = taosArrayGet(pBlock->pDataBlock, c++);
×
UNCOV
596
    TSDB_CHECK_NULL(pColInfo, code, line, END, terrno);
×
UNCOV
597
    code = colDataSetVal(pColInfo, i, info, false);
×
UNCOV
598
    TSDB_CHECK_CODE(code, line, END);
×
599
  }
600

UNCOV
601
  pBlock->info.rows = numOfCfg;
×
602

UNCOV
603
  *block = pBlock;
×
604
  return code;
×
605

606
END:
×
607
  taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
608
  taosMemoryFree(pBlock);
×
UNCOV
609
  return code;
×
610
}
611

UNCOV
612
static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
×
UNCOV
613
  SSDataBlock* pBlock = NULL;
×
614
  int32_t      code = buildShowVariablesBlock(pVars, &pBlock);
×
UNCOV
615
  if (code) {
×
UNCOV
616
    return code;
×
617
  }
618

UNCOV
619
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
×
UNCOV
620
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
×
621
  *pRsp = taosMemoryCalloc(1, rspSize);
×
622
  if (NULL == *pRsp) {
×
UNCOV
623
    code = terrno;
×
UNCOV
624
    goto _exit;
×
625
  }
626

UNCOV
627
  (*pRsp)->useconds = 0;
×
UNCOV
628
  (*pRsp)->completed = 1;
×
UNCOV
629
  (*pRsp)->precision = 0;
×
UNCOV
630
  (*pRsp)->compressed = 0;
×
631

UNCOV
632
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
×
UNCOV
633
  (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
×
634

UNCOV
635
  int32_t len = 0;
×
UNCOV
636
  if ((*pRsp)->numOfRows > 0) {
×
637
    len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, SHOW_VARIABLES_RESULT_COLS);
×
638
    if (len < 0) {
×
639
      uError("buildShowVariablesRsp error, len:%d", len);
×
UNCOV
640
      code = terrno;
×
UNCOV
641
      goto _exit;
×
642
    }
UNCOV
643
    SET_PAYLOAD_LEN((*pRsp)->data, len, len);
×
644

UNCOV
645
    int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
×
UNCOV
646
    (*pRsp)->payloadLen = htonl(payloadLen);
×
UNCOV
647
    (*pRsp)->compLen = htonl(payloadLen);
×
648

UNCOV
649
    if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
×
650
      uError("buildShowVariablesRsp error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
651
             (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
652
      code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
653
      goto _exit;
×
654
    }
655
  }
656

UNCOV
657
  blockDataDestroy(pBlock);
×
UNCOV
658
  pBlock = NULL;
×
659

660
  return TSDB_CODE_SUCCESS;
×
661
_exit:
×
662
  if (*pRsp) {
×
UNCOV
663
    taosMemoryFree(*pRsp);
×
664
    *pRsp = NULL;
×
665
  }
666
  if (pBlock) {
×
UNCOV
667
    blockDataDestroy(pBlock);
×
668
    pBlock = NULL;
×
669
  }
UNCOV
670
  return code;
×
671
}
672

UNCOV
673
int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
674
  SRequestObj* pRequest = param;
×
UNCOV
675
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
676
    setErrno(pRequest, code);
×
677
  } else {
UNCOV
678
    SShowVariablesRsp  rsp = {0};
×
UNCOV
679
    SRetrieveTableRsp* pRes = NULL;
×
UNCOV
680
    code = tDeserializeSShowVariablesRsp(pMsg->pData, pMsg->len, &rsp);
×
UNCOV
681
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
682
      code = buildShowVariablesRsp(rsp.variables, &pRes);
×
683
    }
UNCOV
684
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
685
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false);
×
686
    }
687

UNCOV
688
    if (code != 0) {
×
UNCOV
689
      taosMemoryFree(pRes);
×
690
    }
UNCOV
691
    tFreeSShowVariablesRsp(&rsp);
×
692
  }
693

UNCOV
694
  taosMemoryFree(pMsg->pData);
×
UNCOV
695
  taosMemoryFree(pMsg->pEpSet);
×
696

UNCOV
697
  if (pRequest->body.queryFp != NULL) {
×
698
    doRequestCallback(pRequest, code);
×
699
  } else {
UNCOV
700
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
701
      tscError("failed to post semaphore");
×
702
    }
703
  }
UNCOV
704
  return code;
×
705
}
706

UNCOV
707
static int32_t buildCompactDbBlock(SCompactDbRsp* pRsp, SSDataBlock** block) {
×
UNCOV
708
  int32_t      code = 0;
×
UNCOV
709
  int32_t      line = 0;
×
UNCOV
710
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
×
UNCOV
711
  TSDB_CHECK_NULL(pBlock, code, line, END, terrno);
×
UNCOV
712
  pBlock->info.hasVarCol = true;
×
713

UNCOV
714
  pBlock->pDataBlock = taosArrayInit(COMPACT_DB_RESULT_COLS, sizeof(SColumnInfoData));
×
UNCOV
715
  TSDB_CHECK_NULL(pBlock->pDataBlock, code, line, END, terrno);
×
UNCOV
716
  SColumnInfoData infoData = {0};
×
UNCOV
717
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
718
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD1_LEN;
×
UNCOV
719
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
×
720

UNCOV
721
  infoData.info.type = TSDB_DATA_TYPE_INT;
×
UNCOV
722
  infoData.info.bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;
×
UNCOV
723
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
×
724

UNCOV
725
  infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
×
UNCOV
726
  infoData.info.bytes = COMPACT_DB_RESULT_FIELD3_LEN;
×
UNCOV
727
  TSDB_CHECK_NULL(taosArrayPush(pBlock->pDataBlock, &infoData), code, line, END, terrno);
×
728

UNCOV
729
  code = blockDataEnsureCapacity(pBlock, 1);
×
UNCOV
730
  TSDB_CHECK_CODE(code, line, END);
×
731

UNCOV
732
  SColumnInfoData* pResultCol = taosArrayGet(pBlock->pDataBlock, 0);
×
UNCOV
733
  TSDB_CHECK_NULL(pResultCol, code, line, END, terrno);
×
UNCOV
734
  SColumnInfoData* pIdCol = taosArrayGet(pBlock->pDataBlock, 1);
×
UNCOV
735
  TSDB_CHECK_NULL(pIdCol, code, line, END, terrno);
×
UNCOV
736
  SColumnInfoData* pReasonCol = taosArrayGet(pBlock->pDataBlock, 2);
×
UNCOV
737
  TSDB_CHECK_NULL(pReasonCol, code, line, END, terrno);
×
738

UNCOV
739
  char result[COMPACT_DB_RESULT_FIELD1_LEN] = {0};
×
UNCOV
740
  char reason[COMPACT_DB_RESULT_FIELD3_LEN] = {0};
×
UNCOV
741
  if (pRsp->bAccepted) {
×
UNCOV
742
    STR_TO_VARSTR(result, "accepted");
×
UNCOV
743
    code = colDataSetVal(pResultCol, 0, result, false);
×
UNCOV
744
    TSDB_CHECK_CODE(code, line, END);
×
UNCOV
745
    code = colDataSetVal(pIdCol, 0, (void*)&pRsp->compactId, false);
×
UNCOV
746
    TSDB_CHECK_CODE(code, line, END);
×
UNCOV
747
    STR_TO_VARSTR(reason, "success");
×
UNCOV
748
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
749
    TSDB_CHECK_CODE(code, line, END);
×
750
  } else {
751
    STR_TO_VARSTR(result, "rejected");
×
UNCOV
752
    code = colDataSetVal(pResultCol, 0, result, false);
×
753
    TSDB_CHECK_CODE(code, line, END);
×
754
    colDataSetNULL(pIdCol, 0);
755
    STR_TO_VARSTR(reason, "compaction is ongoing");
×
UNCOV
756
    code = colDataSetVal(pReasonCol, 0, reason, false);
×
UNCOV
757
    TSDB_CHECK_CODE(code, line, END);
×
758
  }
UNCOV
759
  pBlock->info.rows = 1;
×
760

UNCOV
761
  *block = pBlock;
×
762

763
  return TSDB_CODE_SUCCESS;
×
764
END:
×
765
  taosMemoryFree(pBlock);
×
UNCOV
766
  taosArrayDestroy(pBlock->pDataBlock);
×
UNCOV
767
  return code;
×
768
}
769

UNCOV
770
static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetrieveTableRsp** pRsp) {
×
UNCOV
771
  SSDataBlock* pBlock = NULL;
×
772
  int32_t      code = buildCompactDbBlock(pCompactDb, &pBlock);
×
UNCOV
773
  if (code) {
×
UNCOV
774
    return code;
×
775
  }
776

UNCOV
777
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
×
UNCOV
778
  size_t rspSize = sizeof(SRetrieveTableRsp) + dataEncodeBufSize + PAYLOAD_PREFIX_LEN;
×
779
  *pRsp = taosMemoryCalloc(1, rspSize);
×
780
  if (NULL == *pRsp) {
×
UNCOV
781
    code = terrno;
×
UNCOV
782
    goto _exit;
×
783
  }
784

UNCOV
785
  (*pRsp)->useconds = 0;
×
UNCOV
786
  (*pRsp)->completed = 1;
×
UNCOV
787
  (*pRsp)->precision = 0;
×
UNCOV
788
  (*pRsp)->compressed = 0;
×
UNCOV
789
  (*pRsp)->compLen = 0;
×
UNCOV
790
  (*pRsp)->payloadLen = 0;
×
UNCOV
791
  (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
×
UNCOV
792
  (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
×
793

794
  int32_t len = blockEncode(pBlock, (*pRsp)->data + PAYLOAD_PREFIX_LEN, dataEncodeBufSize, COMPACT_DB_RESULT_COLS);
×
795
  if (len < 0) {
×
796
    uError("buildRetriveTableRspForCompactDb error, len:%d", len);
×
UNCOV
797
    code = terrno;
×
UNCOV
798
    goto _exit;
×
799
  }
UNCOV
800
  blockDataDestroy(pBlock);
×
801

UNCOV
802
  SET_PAYLOAD_LEN((*pRsp)->data, len, len);
×
803

UNCOV
804
  int32_t payloadLen = len + PAYLOAD_PREFIX_LEN;
×
UNCOV
805
  (*pRsp)->payloadLen = htonl(payloadLen);
×
UNCOV
806
  (*pRsp)->compLen = htonl(payloadLen);
×
807

UNCOV
808
  if (payloadLen != rspSize - sizeof(SRetrieveTableRsp)) {
×
809
    uError("buildRetriveTableRspForCompactDb error, len:%d != rspSize - sizeof(SRetrieveTableRsp):%" PRIu64, len,
×
810
           (uint64_t)(rspSize - sizeof(SRetrieveTableRsp)));
UNCOV
811
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
812
    goto _exit;
×
813
  }
814

815
  return TSDB_CODE_SUCCESS;
×
816
_exit:
×
817
  if (*pRsp) {
×
UNCOV
818
    taosMemoryFree(*pRsp);
×
819
    *pRsp = NULL;
×
820
  }
821
  if (pBlock) {
×
UNCOV
822
    blockDataDestroy(pBlock);
×
823
    pBlock = NULL;
×
824
  }
UNCOV
825
  return code;
×
826
}
827

UNCOV
828
int32_t processCompactDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
UNCOV
829
  SRequestObj* pRequest = param;
×
UNCOV
830
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
831
    setErrno(pRequest, code);
×
832
  } else {
UNCOV
833
    SCompactDbRsp      rsp = {0};
×
UNCOV
834
    SRetrieveTableRsp* pRes = NULL;
×
UNCOV
835
    code = tDeserializeSCompactDbRsp(pMsg->pData, pMsg->len, &rsp);
×
UNCOV
836
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
837
      code = buildRetriveTableRspForCompactDb(&rsp, &pRes);
×
838
    }
UNCOV
839
    if (TSDB_CODE_SUCCESS == code) {
×
UNCOV
840
      code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false);
×
841
    }
842

UNCOV
843
    if (code != 0) {
×
UNCOV
844
      taosMemoryFree(pRes);
×
845
    }
846
  }
847

UNCOV
848
  taosMemoryFree(pMsg->pData);
×
UNCOV
849
  taosMemoryFree(pMsg->pEpSet);
×
850

UNCOV
851
  if (pRequest->body.queryFp != NULL) {
×
852
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
853
  } else {
UNCOV
854
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
855
      tscError("failed to post semaphore");
×
856
    }
857
  }
UNCOV
858
  return code;
×
859
}
860

UNCOV
861
static int32_t setCreateStreamFailedRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
UNCOV
862
  if (pMsg) {
×
UNCOV
863
    taosMemoryFree(pMsg->pEpSet);
×
UNCOV
864
    taosMemoryFree(pMsg->pData);
×
865
  }
UNCOV
866
  if (code != 0){
×
UNCOV
867
    tscError("setCreateStreamFailedRsp since %s", tstrerror(code));
×
868
  } else{
UNCOV
869
    tscInfo("setCreateStreamFailedRsp success");
×
870
  }
UNCOV
871
  return code;
×
872
}
873

UNCOV
874
void sendCreateStreamFailedMsg(SRequestObj* pRequest, char* streamName){
×
UNCOV
875
  int32_t code  = 0;
×
UNCOV
876
  tscInfo("send failed stream name to mgmt: %s", streamName);
×
UNCOV
877
  int32_t size = INT_BYTES + strlen(streamName);
×
UNCOV
878
  void *buf = taosMemoryMalloc(size);
×
UNCOV
879
  if (buf == NULL) {
×
UNCOV
880
    tscError("failed to strdup stream name: %s", terrstr());
×
UNCOV
881
    return;
×
882
  }
UNCOV
883
  *(int32_t*)buf = pRequest->code;
×
UNCOV
884
  memcpy(POINTER_SHIFT(buf, INT_BYTES), streamName, strlen(streamName));
×
885

UNCOV
886
  SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
×
UNCOV
887
  if (sendInfo == NULL) {
×
UNCOV
888
    taosMemoryFree(buf);
×
UNCOV
889
    tscError("failed to calloc msgSendInfo: %s", terrstr());
×
UNCOV
890
    return;
×
891
  }
UNCOV
892
  sendInfo->msgInfo = (SDataBuf){.pData = buf, .len = size, .handle = NULL};
×
UNCOV
893
  sendInfo->requestId = generateRequestId();
×
UNCOV
894
  sendInfo->requestObjRefId = 0;
×
UNCOV
895
  sendInfo->msgType = TDMT_MND_FAILED_STREAM;
×
UNCOV
896
  sendInfo->fp = setCreateStreamFailedRsp;
×
897

UNCOV
898
  SEpSet epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
×
UNCOV
899
  code = asyncSendMsgToServer(pRequest->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
×
UNCOV
900
  if (code != 0) {
×
UNCOV
901
    tscError("failed to send failed stream name to mgmt since %s", tstrerror(code));
×
902
  }
903
}
904

UNCOV
905
static void processCreateStreamSecondPhaseRsp(void* param, void* res, int32_t code) {
×
UNCOV
906
  SRequestObj* pRequest = res;
×
UNCOV
907
  if (code != 0 && param != NULL){
×
UNCOV
908
    sendCreateStreamFailedMsg(pRequest, param);
×
909
  }
UNCOV
910
  taosMemoryFree(param);
×
UNCOV
911
  destroyRequest(pRequest);
×
UNCOV
912
}
×
913

UNCOV
914
static char* getStreamName(SRequestObj* pRequest){
×
UNCOV
915
  SCreateStreamStmt* pStmt = (SCreateStreamStmt*)(pRequest->pQuery->pRoot);
×
916
  SName   name;
UNCOV
917
  int32_t code = tNameSetDbName(&name, pRequest->pTscObj->acctId, pStmt->streamName, strlen(pStmt->streamName));
×
UNCOV
918
  if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
919
    tscError("failed to set db name for stream since %s", tstrerror(code));
×
UNCOV
920
    return NULL;
×
921
  } else{
UNCOV
922
    char *streamName = taosMemoryCalloc(1, TSDB_STREAM_FNAME_LEN);
×
UNCOV
923
    (void)tNameGetFullDbName(&name, streamName);
×
UNCOV
924
    return streamName;
×
925
  }
926
}
927

UNCOV
928
void processCreateStreamSecondPhase(SRequestObj* pRequest){
×
UNCOV
929
  tscInfo("[create stream with histroy] create in second phase");
×
UNCOV
930
  char *streamName = getStreamName(pRequest);
×
UNCOV
931
  size_t sqlLen = strlen(pRequest->sqlstr);
×
UNCOV
932
  SRequestObj* pRequestNew = NULL;
×
UNCOV
933
  int32_t code = buildRequest(pRequest->pTscObj->id, pRequest->sqlstr, sqlLen, streamName, false, &pRequestNew, 0);
×
UNCOV
934
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
935
    tscError("[create stream with histroy] create in second phase, build request failed since %s", tstrerror(code));
×
UNCOV
936
    return;
×
937
  }
UNCOV
938
  pRequestNew->source = pRequest->source;
×
UNCOV
939
  pRequestNew->body.queryFp = processCreateStreamSecondPhaseRsp;
×
UNCOV
940
  pRequestNew->streamRunHistory = true;
×
UNCOV
941
  doAsyncQuery(pRequestNew, false);
×
942
}
943

UNCOV
944
int32_t processCreateStreamFirstPhaseRsp(void* param, SDataBuf* pMsg, int32_t code) {
×
UNCOV
945
  SRequestObj* pRequest = param;
×
UNCOV
946
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
947
    setErrno(pRequest, code);
×
948
  }
949

UNCOV
950
  if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type)) {
×
UNCOV
951
    if (removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type)) != 0) {
×
UNCOV
952
      tscError("failed to remove meta data for table");
×
953
    }
954
  }
955

UNCOV
956
  taosMemoryFree(pMsg->pData);
×
UNCOV
957
  taosMemoryFree(pMsg->pEpSet);
×
958

UNCOV
959
  if (code == 0 && !pRequest->streamRunHistory && tsStreamRunHistoryAsync){
×
UNCOV
960
    processCreateStreamSecondPhase(pRequest);
×
961
  }
962
  
UNCOV
963
  if (pRequest->body.queryFp != NULL) {
×
UNCOV
964
    pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
×
965
  } else {
UNCOV
966
    if (tsem_post(&pRequest->body.rspSem) != 0) {
×
UNCOV
967
      tscError("failed to post semaphore");
×
968
    }
969
  }
970

UNCOV
971
  return code;
×
972
}
973

974
__async_send_cb_fn_t getMsgRspHandle(int32_t msgType) {
80✔
975
  switch (msgType) {
80!
976
    case TDMT_MND_CONNECT:
36✔
977
      return processConnectRsp;
36✔
978
    case TDMT_MND_CREATE_DB:
15✔
979
      return processCreateDbRsp;
15✔
980
    case TDMT_MND_USE_DB:
6✔
981
      return processUseDbRsp;
6✔
982
    case TDMT_MND_CREATE_STB:
3✔
983
      return processCreateSTableRsp;
3✔
984
    case TDMT_MND_DROP_DB:
18✔
985
      return processDropDbRsp;
18✔
UNCOV
986
    case TDMT_MND_ALTER_STB:
×
UNCOV
987
      return processAlterStbRsp;
×
UNCOV
988
    case TDMT_MND_SHOW_VARIABLES:
×
UNCOV
989
      return processShowVariablesRsp;
×
UNCOV
990
    case TDMT_MND_CREATE_STREAM:
×
UNCOV
991
      return processCreateStreamFirstPhaseRsp;
×
UNCOV
992
    case TDMT_MND_COMPACT_DB:
×
UNCOV
993
      return processCompactDbRsp;
×
994
    default:
2✔
995
      return genericRspCallback;
2✔
996
  }
997
}
998

STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc