• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.5
/source/libs/qcom/src/querymsg.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "query.h"
17
#include "queryInt.h"
18
#include "systable.h"
19
#include "tmsg.h"
20
#include "trpc.h"
21

22
#pragma GCC diagnostic push
23
#ifdef COMPILER_SUPPORTS_CXX13
24
#pragma GCC diagnostic ignored "-Wformat-truncation"
25
#endif
26

27
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
28
                                   void *(*mallocFp)(int64_t)) = {0};
29
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0};
30

31
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
111,955✔
32
  QUERY_PARAM_CHECK(pOut);
111,955!
33
  QUERY_PARAM_CHECK(usedbRsp);
111,955!
34
  memcpy(pOut->db, usedbRsp->db, TSDB_DB_FNAME_LEN);
111,955✔
35
  pOut->dbId = usedbRsp->uid;
111,955✔
36

37
  pOut->dbVgroup = taosMemoryCalloc(1, sizeof(SDBVgInfo));
111,955!
38
  if (NULL == pOut->dbVgroup) {
111,955!
39
    return terrno;
×
40
  }
41

42
  pOut->dbVgroup->vgVersion = usedbRsp->vgVersion;
111,955✔
43
  pOut->dbVgroup->hashMethod = usedbRsp->hashMethod;
111,955✔
44
  pOut->dbVgroup->hashPrefix = usedbRsp->hashPrefix;
111,955✔
45
  pOut->dbVgroup->hashSuffix = usedbRsp->hashSuffix;
111,955✔
46
  pOut->dbVgroup->stateTs = usedbRsp->stateTs;
111,955✔
47

48
  qDebug("db:%s, get %d vgroup, vgVersion:%d, stateTs:%" PRId64, usedbRsp->db, usedbRsp->vgNum, usedbRsp->vgVersion,
111,955✔
49
         usedbRsp->stateTs);
50

51
  if (usedbRsp->vgNum <= 0) {
111,955✔
52
    return TSDB_CODE_SUCCESS;
9,619✔
53
  }
54

55
  pOut->dbVgroup->vgHash =
204,672✔
56
      taosHashInit(usedbRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
102,336✔
57
  if (NULL == pOut->dbVgroup->vgHash) {
102,336!
58
    return terrno;
×
59
  }
60

61
  for (int32_t i = 0; i < usedbRsp->vgNum; ++i) {
370,018✔
62
    SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i);
267,682✔
63
    pOut->dbVgroup->numOfTable += pVgInfo->numOfTable;
267,682✔
64
    qDebug("the %dth vgroup, id:%d, epNum:%d, current:%s port:%u", i, pVgInfo->vgId, pVgInfo->epSet.numOfEps,
267,682✔
65
           pVgInfo->epSet.eps[pVgInfo->epSet.inUse].fqdn, pVgInfo->epSet.eps[pVgInfo->epSet.inUse].port);
66
    if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
267,682!
67
      return terrno;
×
68
    }
69
  }
70

71
  return TSDB_CODE_SUCCESS;
102,336✔
72
}
73

74
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
118,393✔
75
                                  void *(*mallcFp)(int64_t)) {
76
  QUERY_PARAM_CHECK(input);
118,393!
77
  QUERY_PARAM_CHECK(msg);
118,393!
78
  QUERY_PARAM_CHECK(msgLen);
118,393!
79
  SBuildTableInput *pInput = input;
118,393✔
80

81
  STableInfoReq infoReq = {0};
118,393✔
82
  infoReq.option = pInput->option;
118,393✔
83
  infoReq.header.vgId = pInput->vgId;
118,393✔
84
  if (pInput->dbFName) {
118,393✔
85
    tstrncpy(infoReq.dbFName, pInput->dbFName, TSDB_DB_FNAME_LEN);
118,384✔
86
  }
87
  tstrncpy(infoReq.tbName, pInput->tbName, TSDB_TABLE_NAME_LEN);
118,393✔
88

89
  int32_t bufLen = tSerializeSTableInfoReq(NULL, 0, &infoReq);
118,393✔
90
  void   *pBuf = (*mallcFp)(bufLen);
118,377✔
91
  if (NULL == pBuf) {
118,399!
92
    return terrno;
×
93
  }
94
  int32_t ret = tSerializeSTableInfoReq(pBuf, bufLen, &infoReq);
118,399✔
95
  if (ret < 0) return ret;
118,402!
96

97
  *msg = pBuf;
118,402✔
98
  *msgLen = bufLen;
118,402✔
99

100
  return TSDB_CODE_SUCCESS;
118,402✔
101
}
102

103
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
105,584✔
104
  QUERY_PARAM_CHECK(input);
105,584!
105
  QUERY_PARAM_CHECK(msg);
105,584!
106
  QUERY_PARAM_CHECK(msgLen);
105,584!
107
  SBuildUseDBInput *pInput = input;
105,584✔
108

109
  SUseDbReq usedbReq = {0};
105,584✔
110
  tstrncpy(usedbReq.db, pInput->db, TSDB_DB_FNAME_LEN);
105,584✔
111
  usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
105,584✔
112
  usedbReq.vgVersion = pInput->vgVersion;
105,584✔
113
  usedbReq.dbId = pInput->dbId;
105,584✔
114
  usedbReq.numOfTable = pInput->numOfTable;
105,584✔
115
  usedbReq.stateTs = pInput->stateTs;
105,584✔
116

117
  int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
105,584✔
118
  void   *pBuf = (*mallcFp)(bufLen);
105,583✔
119
  if (NULL == pBuf) {
105,583!
120
    return terrno;
×
121
  }
122
  int32_t ret = tSerializeSUseDbReq(pBuf, bufLen, &usedbReq);
105,583✔
123
  if (ret < 0) return ret;
105,584!
124

125
  *msg = pBuf;
105,584✔
126
  *msgLen = bufLen;
105,584✔
127

128
  return TSDB_CODE_SUCCESS;
105,584✔
129
}
130

131
int32_t queryBuildQnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
10,557✔
132
  QUERY_PARAM_CHECK(msg);
10,557!
133
  QUERY_PARAM_CHECK(msgLen);
10,557!
134

135
  SQnodeListReq qnodeListReq = {0};
10,557✔
136
  qnodeListReq.rowNum = -1;
10,557✔
137

138
  int32_t bufLen = tSerializeSQnodeListReq(NULL, 0, &qnodeListReq);
10,557✔
139
  void   *pBuf = (*mallcFp)(bufLen);
10,557✔
140
  if (NULL == pBuf) {
10,557!
141
    return terrno;
×
142
  }
143

144
  int32_t ret = tSerializeSQnodeListReq(pBuf, bufLen, &qnodeListReq);
10,557✔
145
  if (ret < 0) return ret;
10,557!
146

147
  *msg = pBuf;
10,557✔
148
  *msgLen = bufLen;
10,557✔
149

150
  return TSDB_CODE_SUCCESS;
10,557✔
151
}
152

153
int32_t queryBuildDnodeListMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
345✔
154
  QUERY_PARAM_CHECK(msg);
345!
155
  QUERY_PARAM_CHECK(msgLen);
345!
156

157
  SDnodeListReq dnodeListReq = {0};
345✔
158
  dnodeListReq.rowNum = -1;
345✔
159

160
  int32_t bufLen = tSerializeSDnodeListReq(NULL, 0, &dnodeListReq);
345✔
161
  void   *pBuf = (*mallcFp)(bufLen);
345✔
162
  if (NULL == pBuf) {
345!
163
    return terrno;
×
164
  }
165
  int32_t ret = tSerializeSDnodeListReq(pBuf, bufLen, &dnodeListReq);
345✔
166
  if (ret < 0) return ret;
345!
167

168
  *msg = pBuf;
345✔
169
  *msgLen = bufLen;
345✔
170

171
  return TSDB_CODE_SUCCESS;
345✔
172
}
173

174
int32_t queryBuildGetSerVerMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
1✔
175
  QUERY_PARAM_CHECK(msg);
1!
176
  QUERY_PARAM_CHECK(msgLen);
1!
177

178
  SServerVerReq req = {0};
1✔
179

180
  int32_t bufLen = tSerializeSServerVerReq(NULL, 0, &req);
1✔
181
  void   *pBuf = (*mallcFp)(bufLen);
1✔
182
  if (NULL == pBuf) {
1!
183
    return terrno;
×
184
  }
185
  int32_t ret = tSerializeSServerVerReq(pBuf, bufLen, &req);
1✔
186
  if (ret < 0) return ret;
1!
187

188
  *msg = pBuf;
1✔
189
  *msgLen = bufLen;
1✔
190

191
  return TSDB_CODE_SUCCESS;
1✔
192
}
193

194
int32_t queryBuildGetDBCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
3,969✔
195
  QUERY_PARAM_CHECK(input);
3,969!
196
  QUERY_PARAM_CHECK(msg);
3,969!
197
  QUERY_PARAM_CHECK(msgLen);
3,969!
198

199
  SDbCfgReq dbCfgReq = {0};
3,969✔
200
  tstrncpy(dbCfgReq.db, input, TSDB_DB_FNAME_LEN);
3,969✔
201

202
  int32_t bufLen = tSerializeSDbCfgReq(NULL, 0, &dbCfgReq);
3,969✔
203
  void   *pBuf = (*mallcFp)(bufLen);
3,969✔
204
  if (NULL == pBuf) {
3,969!
205
    return terrno;
×
206
  }
207
  int32_t ret = tSerializeSDbCfgReq(pBuf, bufLen, &dbCfgReq);
3,969✔
208
  if (ret < 0) return ret;
3,969!
209

210
  *msg = pBuf;
3,969✔
211
  *msgLen = bufLen;
3,969✔
212

213
  return TSDB_CODE_SUCCESS;
3,969✔
214
}
215

216
int32_t queryBuildGetIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
1✔
217
  QUERY_PARAM_CHECK(input);
1!
218
  QUERY_PARAM_CHECK(msg);
1!
219
  QUERY_PARAM_CHECK(msgLen);
1!
220

221
  SUserIndexReq indexReq = {0};
1✔
222
  tstrncpy(indexReq.indexFName, input, TSDB_INDEX_FNAME_LEN);
1✔
223

224
  int32_t bufLen = tSerializeSUserIndexReq(NULL, 0, &indexReq);
1✔
225
  void   *pBuf = (*mallcFp)(bufLen);
1✔
226
  if (NULL == pBuf) {
1!
227
    return terrno;
×
228
  }
229
  int32_t ret = tSerializeSUserIndexReq(pBuf, bufLen, &indexReq);
1✔
230
  if (ret < 0) return ret;
1!
231

232
  *msg = pBuf;
1✔
233
  *msgLen = bufLen;
1✔
234

235
  return TSDB_CODE_SUCCESS;
1✔
236
}
237

238
int32_t queryBuildRetrieveFuncMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
97✔
239
                                  void *(*mallcFp)(int64_t)) {
240
  QUERY_PARAM_CHECK(input);
97!
241
  QUERY_PARAM_CHECK(msg);
97!
242
  QUERY_PARAM_CHECK(msgLen);
97!
243

244
  SRetrieveFuncReq funcReq = {0};
97✔
245
  funcReq.numOfFuncs = 1;
97✔
246
  funcReq.ignoreCodeComment = true;
97✔
247
  funcReq.pFuncNames = taosArrayInit(1, strlen(input) + 1);
97✔
248
  if (NULL == funcReq.pFuncNames) {
97!
249
    return terrno;
×
250
  }
251
  if (taosArrayPush(funcReq.pFuncNames, input) == NULL) {
194!
252
    taosArrayDestroy(funcReq.pFuncNames);
×
253
    return terrno;
×
254
  }
255

256
  int32_t bufLen = tSerializeSRetrieveFuncReq(NULL, 0, &funcReq);
97✔
257
  void   *pBuf = (*mallcFp)(bufLen);
97✔
258
  if (NULL == pBuf) {
97!
259
    taosArrayDestroy(funcReq.pFuncNames);
×
260
    return terrno;
×
261
  }
262
  int32_t ret = tSerializeSRetrieveFuncReq(pBuf, bufLen, &funcReq);
97✔
263
  if (ret < 0) {
97!
264
    taosArrayDestroy(funcReq.pFuncNames);
×
265
    return ret;
×
266
  }
267

268
  taosArrayDestroy(funcReq.pFuncNames);
97✔
269

270
  *msg = pBuf;
97✔
271
  *msgLen = bufLen;
97✔
272

273
  return TSDB_CODE_SUCCESS;
97✔
274
}
275

276
int32_t queryBuildGetUserAuthMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
20,299✔
277
  QUERY_PARAM_CHECK(input);
20,299!
278
  QUERY_PARAM_CHECK(msg);
20,299!
279
  QUERY_PARAM_CHECK(msgLen);
20,299!
280

281
  SGetUserAuthReq req = {0};
20,299✔
282
  tstrncpy(req.user, input, TSDB_USER_LEN);
20,299✔
283

284
  int32_t bufLen = tSerializeSGetUserAuthReq(NULL, 0, &req);
20,299✔
285
  void   *pBuf = (*mallcFp)(bufLen);
20,298✔
286
  if (NULL == pBuf) {
20,298!
287
    return terrno;
×
288
  }
289
  int32_t ret = tSerializeSGetUserAuthReq(pBuf, bufLen, &req);
20,298✔
290
  if (ret < 0) return ret;
20,299!
291

292
  *msg = pBuf;
20,299✔
293
  *msgLen = bufLen;
20,299✔
294

295
  return TSDB_CODE_SUCCESS;
20,299✔
296
}
297

298
int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
7,438✔
299
  QUERY_PARAM_CHECK(input);
7,438!
300
  QUERY_PARAM_CHECK(msg);
7,438!
301
  QUERY_PARAM_CHECK(msgLen);
7,438!
302

303
  STableIndexReq indexReq = {0};
7,438✔
304
  tstrncpy(indexReq.tbFName, input, TSDB_TABLE_FNAME_LEN);
7,438✔
305

306
  int32_t bufLen = tSerializeSTableIndexReq(NULL, 0, &indexReq);
7,438✔
307
  void   *pBuf = (*mallcFp)(bufLen);
7,438✔
308
  if (NULL == pBuf) {
7,438!
309
    return terrno;
×
310
  }
311
  int32_t ret = tSerializeSTableIndexReq(pBuf, bufLen, &indexReq);
7,438✔
312
  if (ret < 0) return ret;
7,438!
313

314
  *msg = pBuf;
7,438✔
315
  *msgLen = bufLen;
7,438✔
316

317
  return TSDB_CODE_SUCCESS;
7,438✔
318
}
319

320
int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
195✔
321
  QUERY_PARAM_CHECK(input);
195!
322
  QUERY_PARAM_CHECK(msg);
195!
323
  QUERY_PARAM_CHECK(msgLen);
195!
324

325
  SBuildTableInput *pInput = input;
195✔
326
  STableCfgReq      cfgReq = {0};
195✔
327
  cfgReq.header.vgId = pInput->vgId;
195✔
328
  tstrncpy(cfgReq.dbFName, pInput->dbFName, TSDB_DB_FNAME_LEN);
195✔
329
  tstrncpy(cfgReq.tbName, pInput->tbName, TSDB_TABLE_NAME_LEN);
195✔
330

331
  int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
195✔
332
  void   *pBuf = (*mallcFp)(bufLen);
195✔
333
  if (NULL == pBuf) {
195!
334
    return terrno;
×
335
  }
336
  int32_t ret = tSerializeSTableCfgReq(pBuf, bufLen, &cfgReq);
195✔
337
  if (ret < 0) return ret;
195!
338

339
  *msg = pBuf;
195✔
340
  *msgLen = bufLen;
195✔
341

342
  return TSDB_CODE_SUCCESS;
195✔
343
}
344

345
int32_t queryBuildGetViewMetaMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void *(*mallcFp)(int64_t)) {
38,082✔
346
  QUERY_PARAM_CHECK(input);
38,082!
347
  QUERY_PARAM_CHECK(msg);
38,082!
348
  QUERY_PARAM_CHECK(msgLen);
38,082!
349

350
  SViewMetaReq req = {0};
38,082✔
351
  tstrncpy(req.fullname, input, TSDB_VIEW_FNAME_LEN);
38,082✔
352

353
  int32_t bufLen = tSerializeSViewMetaReq(NULL, 0, &req);
38,082✔
354
  void   *pBuf = (*mallcFp)(bufLen);
38,080✔
355
  if (NULL == pBuf) {
38,081!
356
    return terrno;
×
357
  }
358
  int32_t ret = tSerializeSViewMetaReq(pBuf, bufLen, &req);
38,081✔
359
  if (ret < 0) return ret;
38,081!
360

361
  *msg = pBuf;
38,081✔
362
  *msgLen = bufLen;
38,081✔
363

364
  return TSDB_CODE_SUCCESS;
38,081✔
365
}
366

367
int32_t queryBuildGetTableTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
1,051✔
368
                                  void *(*mallcFp)(int64_t)) {
369
  QUERY_PARAM_CHECK(input);
1,051!
370
  QUERY_PARAM_CHECK(msg);
1,051!
371
  QUERY_PARAM_CHECK(msgLen);
1,051!
372

373
  STableTSMAInfoReq req = {0};
1,051✔
374
  tstrncpy(req.name, input, TSDB_TABLE_FNAME_LEN);
1,051✔
375

376
  int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req);
1,051✔
377
  void *  pBuf = (*mallcFp)(bufLen);
1,051✔
378
  if (NULL == pBuf) {
1,051!
379
    return terrno;
×
380
  }
381
  int32_t ret = tSerializeTableTSMAInfoReq(pBuf, bufLen, &req);
1,051✔
382
  if (ret < 0) return ret;
1,051!
383

384
  *msg = pBuf;
1,051✔
385
  *msgLen = bufLen;
1,051✔
386
  return TSDB_CODE_SUCCESS;
1,051✔
387
}
388

389
int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen,
76✔
390
                                  void *(*mallcFp)(int64_t)) {
391
  QUERY_PARAM_CHECK(input);
76!
392
  QUERY_PARAM_CHECK(msg);
76!
393
  QUERY_PARAM_CHECK(msgLen);
76!
394

395
  STableTSMAInfoReq req = {0};
76✔
396
  req.fetchingWithTsmaName = true;
76✔
397
  tstrncpy(req.name, input, TSDB_TABLE_FNAME_LEN);
76✔
398

399
  int32_t bufLen = tSerializeTableTSMAInfoReq(NULL, 0, &req);
76✔
400
  void *  pBuf = (*mallcFp)(bufLen);
76✔
401
  if(pBuf == NULL)
76!
402
  {
403
    return terrno;
×
404
  }
405
  int32_t ret = tSerializeTableTSMAInfoReq(pBuf, bufLen, &req);
76✔
406
  if(ret < 0) return ret;
76!
407

408
  *msg = pBuf;
76✔
409
  *msgLen = bufLen;
76✔
410
  return TSDB_CODE_SUCCESS;
76✔
411
}
412

413
int32_t queryBuildGetStreamProgressMsg(void* input, char** msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int64_t)) {
4,868✔
414
  QUERY_PARAM_CHECK(input);
4,868!
415
  QUERY_PARAM_CHECK(msg);
4,868!
416
  QUERY_PARAM_CHECK(msgLen);
4,868!
417

418
  int32_t len = tSerializeStreamProgressReq(NULL, 0, input);
4,868✔
419
  void* pBuf = (*mallcFp)(len);
4,868✔
420
  if (NULL == pBuf) {
4,868!
421
    return terrno;
×
422
  }
423

424
  int32_t ret = tSerializeStreamProgressReq(pBuf, len, input);
4,868✔
425
  if (ret < 0) return ret;
4,868!
426

427
  *msg = pBuf;
4,868✔
428
  *msgLen = len;
4,868✔
429
  return TSDB_CODE_SUCCESS;
4,868✔
430
}
431

432

NEW
433
int32_t queryBuildVSubTablesMsg(void* input, char** msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int64_t)) {
×
NEW
434
  QUERY_PARAM_CHECK(input);
×
NEW
435
  QUERY_PARAM_CHECK(msg);
×
NEW
436
  QUERY_PARAM_CHECK(msgLen);
×
437

NEW
438
  SVSubTablesReq req = {0};
×
NEW
439
  req.suid = *(int64_t*)input;
×
440

NEW
441
  int32_t bufLen = tSerializeSVSubTablesReq(NULL, 0, &req);
×
NEW
442
  void   *pBuf = (*mallcFp)(bufLen);
×
NEW
443
  if (NULL == pBuf) {
×
NEW
444
    return terrno;
×
445
  }
NEW
446
  if(tSerializeSVSubTablesReq(pBuf, bufLen, &req) < 0)   {
×
NEW
447
    return TSDB_CODE_TSC_INVALID_INPUT;
×
448
  }
449

NEW
450
  *msg = pBuf;
×
NEW
451
  *msgLen = bufLen;
×
452

NEW
453
  return TSDB_CODE_SUCCESS;
×
454
}
455

456

457
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
104,344✔
458
  SUseDbOutput *pOut = output;
104,344✔
459
  SUseDbRsp     usedbRsp = {0};
104,344✔
460
  int32_t       code = -1;
104,344✔
461

462
  if (NULL == output || NULL == msg || msgSize <= 0) {
104,344!
463
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
464
    qError("invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
465
    goto PROCESS_USEDB_OVER;
×
466
  }
467

468
  if (tDeserializeSUseDbRsp(msg, msgSize, &usedbRsp) != 0) {
104,344!
469
    qError("invalid use db rsp msg, msgSize:%d", msgSize);
×
470
    code = TSDB_CODE_INVALID_MSG;
×
471
    goto PROCESS_USEDB_OVER;
×
472
  }
473

474
  if (usedbRsp.vgNum < 0) {
104,344!
475
    qError("invalid db[%s] vgroup number[%d]", usedbRsp.db, usedbRsp.vgNum);
×
476
    code = TSDB_CODE_TSC_INVALID_VALUE;
×
477
    goto PROCESS_USEDB_OVER;
×
478
  }
479

480
  qTrace("db:%s, usedbRsp received, numOfVgroups:%d", usedbRsp.db, usedbRsp.vgNum);
104,344✔
481
  for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
364,866✔
482
    SVgroupInfo *pInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
260,522✔
483
    qTrace("vgId:%d, numOfEps:%d inUse:%d ", pInfo->vgId, pInfo->epSet.numOfEps, pInfo->epSet.inUse);
260,522✔
484
    for (int32_t j = 0; j < pInfo->epSet.numOfEps; ++j) {
583,999✔
485
      qTrace("vgId:%d, index:%d epset:%s:%u", pInfo->vgId, j, pInfo->epSet.eps[j].fqdn, pInfo->epSet.eps[j].port);
323,477✔
486
    }
487
  }
488

489
  code = queryBuildUseDbOutput(pOut, &usedbRsp);
104,344✔
490

491
PROCESS_USEDB_OVER:
104,344✔
492

493
  if (code != 0) {
104,344!
494
    if (pOut) {
×
495
      if (pOut->dbVgroup) taosHashCleanup(pOut->dbVgroup->vgHash);
×
496
      taosMemoryFreeClear(pOut->dbVgroup);
×
497
    }
498
    qError("failed to process usedb rsp since %s", terrstr());
×
499
  }
500

501
  tFreeSUsedbRsp(&usedbRsp);
104,344✔
502
  return code;
104,344✔
503
}
504

505
static int32_t queryConvertTableMetaMsg(STableMetaRsp *pMetaMsg) {
82,975✔
506
  QUERY_PARAM_CHECK(pMetaMsg);
82,975!
507
  if (pMetaMsg->numOfTags < 0 || pMetaMsg->numOfTags > TSDB_MAX_TAGS) {
82,975!
508
    qError("invalid numOfTags[%d] in table meta rsp msg", pMetaMsg->numOfTags);
7!
509
    return TSDB_CODE_TSC_INVALID_VALUE;
×
510
  }
511

512
  if (pMetaMsg->numOfColumns > TSDB_MAX_COLUMNS || pMetaMsg->numOfColumns <= 0) {
82,968!
UNCOV
513
    qError("invalid numOfColumns[%d] in table meta rsp msg", pMetaMsg->numOfColumns);
×
514
    return TSDB_CODE_TSC_INVALID_VALUE;
×
515
  }
516

517
  if (pMetaMsg->tableType != TSDB_SUPER_TABLE && pMetaMsg->tableType != TSDB_CHILD_TABLE &&
82,973✔
518
      pMetaMsg->tableType != TSDB_NORMAL_TABLE && pMetaMsg->tableType != TSDB_SYSTEM_TABLE &&
19,209✔
519
      pMetaMsg->tableType != TSDB_VIRTUAL_NORMAL_TABLE && pMetaMsg->tableType != TSDB_VIRTUAL_CHILD_TABLE) {
40!
520
    qError("invalid tableType[%d] in table meta rsp msg", pMetaMsg->tableType);
×
UNCOV
521
    return TSDB_CODE_TSC_INVALID_VALUE;
×
522
  }
523

524
  if (pMetaMsg->sversion < 0) {
82,973!
525
    qError("invalid sversion[%d] in table meta rsp msg", pMetaMsg->sversion);
×
UNCOV
526
    return TSDB_CODE_TSC_INVALID_VALUE;
×
527
  }
528

529
  if (pMetaMsg->tversion < 0) {
82,973!
530
    qError("invalid tversion[%d] in table meta rsp msg", pMetaMsg->tversion);
×
UNCOV
531
    return TSDB_CODE_TSC_INVALID_VALUE;
×
532
  }
533

534
  if (pMetaMsg->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
82,973!
535
    qError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", pMetaMsg->pSchemas[0].colId);
×
UNCOV
536
    return TSDB_CODE_TSC_INVALID_VALUE;
×
537
  }
538

539
  return TSDB_CODE_SUCCESS;
82,973✔
540
}
541

542
int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta) {
123,308✔
543
  QUERY_PARAM_CHECK(msg);
123,308!
544
  QUERY_PARAM_CHECK(pMeta);
123,308!
545
  pMeta->vgId = msg->vgId;
123,308✔
546
  pMeta->tableType = msg->tableType;
123,308✔
547
  pMeta->uid = msg->tuid;
123,308✔
548
  pMeta->suid = msg->suid;
123,308✔
549

550
  qDebug("ctb:%s, uid:0x%" PRIx64 " meta returned, type:%d vgId:%d db:%s suid:%" PRIx64, msg->tbName, pMeta->uid,
123,308✔
551
         pMeta->tableType, pMeta->vgId, msg->dbFName, pMeta->suid);
552

553
  return TSDB_CODE_SUCCESS;
123,310✔
554
}
555

556
int32_t queryCreateVCTableMetaFromMsg(STableMetaRsp *msg, SVCTableMeta **pMeta) {
129✔
557
  QUERY_PARAM_CHECK(msg);
129!
558
  QUERY_PARAM_CHECK(pMeta);
129!
559
  QUERY_PARAM_CHECK(msg->pColRefs);
129!
560

561
  int32_t pColRefSize = sizeof(SColRef) * msg->numOfColRefs;
129✔
562

563
  SVCTableMeta *pTableMeta = taosMemoryCalloc(1, sizeof(SVCTableMeta) + pColRefSize);
129!
564
  if (NULL == pTableMeta) {
129!
565
    qError("calloc size[%d] failed", (int32_t)sizeof(SVCTableMeta) + pColRefSize);
×
UNCOV
566
    return terrno;
×
567
  }
568

569
  pTableMeta->vgId = msg->vgId;
129✔
570
  pTableMeta->tableType = msg->tableType;
129✔
571
  pTableMeta->uid = msg->tuid;
129✔
572
  pTableMeta->suid = msg->suid;
129✔
573
  pTableMeta->numOfColRefs = msg->numOfColRefs;
129✔
574

575
  pTableMeta->colRef = (SColRef *)((char *)pTableMeta + sizeof(SVCTableMeta));
129✔
576
  memcpy(pTableMeta->colRef, msg->pColRefs, pColRefSize);
129✔
577

578
  qDebug("ctable %s uid %" PRIx64 " meta returned, type %d vgId:%d db %s suid %" PRIx64, msg->tbName, (pTableMeta)->uid,
129!
579
         (pTableMeta)->tableType, (pTableMeta)->vgId, msg->dbFName, (pTableMeta)->suid);
580

581
  *pMeta = pTableMeta;
129✔
582
  return TSDB_CODE_SUCCESS;
129✔
583
}
584

585
int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta **pMeta) {
104,782✔
586
  QUERY_PARAM_CHECK(msg);
104,782!
587
  QUERY_PARAM_CHECK(pMeta);
104,782!
588
  int32_t total = msg->numOfColumns + msg->numOfTags;
104,782✔
589
  int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
104,782✔
590
  int32_t schemaExtSize = (withExtSchema(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
104,782✔
591
  int32_t pColRefSize = (hasRefCol(msg->tableType) && msg->pColRefs && !isStb) ? sizeof(SColRef) * msg->numOfColRefs : 0;
104,781!
592

593
  STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + pColRefSize);
104,778!
594
  if (NULL == pTableMeta) {
104,777!
UNCOV
595
    qError("calloc size[%d] failed", metaSize);
×
UNCOV
596
    return terrno;
×
597
  }
598
  SSchemaExt *pSchemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize);
104,777✔
599
  SColRef    *pColRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize);
104,777✔
600

601
  pTableMeta->vgId = isStb ? 0 : msg->vgId;
104,777✔
602
  pTableMeta->tableType = isStb ? TSDB_SUPER_TABLE : msg->tableType;
104,777✔
603
  pTableMeta->uid = isStb ? msg->suid : msg->tuid;
104,777✔
604
  pTableMeta->suid = msg->suid;
104,777✔
605
  pTableMeta->sversion = msg->sversion;
104,777✔
606
  pTableMeta->tversion = msg->tversion;
104,777✔
607
  pTableMeta->virtualStb = msg->virtualStb;
104,777✔
608
  pTableMeta->numOfColRefs = msg->numOfColRefs;
104,777✔
609

610
  pTableMeta->tableInfo.numOfTags = msg->numOfTags;
104,777✔
611
  pTableMeta->tableInfo.precision = msg->precision;
104,777✔
612
  pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
104,777✔
613

614
  memcpy(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total);
104,777✔
615
  if (withExtSchema(msg->tableType) && msg->pSchemaExt) {
104,777✔
616
    pTableMeta->schemaExt = pSchemaExt;
102,368✔
617
    memcpy(pSchemaExt, msg->pSchemaExt, schemaExtSize);
102,368✔
618
  } else {
619
    pTableMeta->schemaExt = NULL;
2,413✔
620
  }
621

622
  if (hasRefCol(msg->tableType) && msg->pColRefs && !isStb) {
104,781!
623
    pTableMeta->colRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize);
477✔
624
    memcpy(pTableMeta->colRef, msg->pColRefs, pColRefSize);
477✔
625
  } else {
626
    pTableMeta->colRef = NULL;
104,304✔
627
  }
628

629
  bool hasPK = (msg->numOfColumns > 1) && (pTableMeta->schema[1].flags & COL_IS_KEY);
104,781!
630
  for (int32_t i = 0; i < msg->numOfColumns; ++i) {
2,261,699✔
631
    pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
2,156,918✔
632
    if (hasPK && (i > 0)) {
2,156,918✔
633
      if ((pTableMeta->schema[i].flags & COL_IS_KEY)) {
78,886✔
634
        ++pTableMeta->tableInfo.numOfPKs;
39,453✔
635
      } else {
636
        hasPK = false;
39,433✔
637
      }
638
    }
639
  }
640

641
  qDebug("tb:%s, uid:%" PRIx64 " meta returned, type:%d vgId:%d db:%s stb:%s suid:%" PRIx64
104,781✔
642
         " sver:%d tver:%d"
643
         " tagNum:%d colNum:%d precision:%d rowSize:%d",
644
         msg->tbName, pTableMeta->uid, pTableMeta->tableType, pTableMeta->vgId, msg->dbFName, msg->stbName,
645
         pTableMeta->suid, pTableMeta->sversion, pTableMeta->tversion, pTableMeta->tableInfo.numOfTags,
646
         pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.precision, pTableMeta->tableInfo.rowSize);
647

648
  *pMeta = pTableMeta;
104,777✔
649
  return TSDB_CODE_SUCCESS;
104,777✔
650
}
651

652
int32_t queryCreateTableMetaExFromMsg(STableMetaRsp *msg, bool isStb, STableMeta **pMeta) {
1,230✔
653
  QUERY_PARAM_CHECK(msg);
1,230!
654
  QUERY_PARAM_CHECK(pMeta);
1,230!
655
  int32_t total = msg->numOfColumns + msg->numOfTags;
1,230✔
656
  int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total;
1,230✔
657
  int32_t schemaExtSize = (withExtSchema(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0;
1,230!
658
  int32_t pColRefSize = (hasRefCol(msg->tableType) && msg->pColRefs) ? sizeof(SColRef) * msg->numOfColRefs : 0;
1,230!
659
  int32_t tbNameSize = strlen(msg->tbName) + 1;
1,230✔
660

661
  STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize + pColRefSize + tbNameSize);
1,230!
662
  if (NULL == pTableMeta) {
1,230!
UNCOV
663
    qError("calloc size[%d] failed", metaSize);
×
UNCOV
664
    return terrno;
×
665
  }
666
  SSchemaExt *pSchemaExt = (SSchemaExt *)((char *)pTableMeta + metaSize);
1,230✔
667
  SColRef    *pColRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize);
1,230✔
668

669
  pTableMeta->vgId = isStb ? 0 : msg->vgId;
1,230✔
670
  pTableMeta->tableType = isStb ? TSDB_SUPER_TABLE : msg->tableType;
1,230✔
671
  pTableMeta->uid = isStb ? msg->suid : msg->tuid;
1,230✔
672
  pTableMeta->suid = msg->suid;
1,230✔
673
  pTableMeta->sversion = msg->sversion;
1,230✔
674
  pTableMeta->tversion = msg->tversion;
1,230✔
675
  pTableMeta->virtualStb = msg->virtualStb;
1,230✔
676
  pTableMeta->numOfColRefs = msg->numOfColRefs;
1,230✔
677

678
  pTableMeta->tableInfo.numOfTags = msg->numOfTags;
1,230✔
679
  pTableMeta->tableInfo.precision = msg->precision;
1,230✔
680
  pTableMeta->tableInfo.numOfColumns = msg->numOfColumns;
1,230✔
681

682
  TAOS_MEMCPY(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total);
1,230✔
683
  if (withExtSchema(msg->tableType) && msg->pSchemaExt) {
1,230!
684
    pTableMeta->schemaExt = pSchemaExt;
1,230✔
685
    TAOS_MEMCPY(pSchemaExt, msg->pSchemaExt, schemaExtSize);
1,230✔
686
  } else {
UNCOV
687
    pTableMeta->schemaExt = NULL;
×
688
  }
689

690
  if (hasRefCol(msg->tableType) && msg->pColRefs) {
1,230!
UNCOV
691
    pTableMeta->colRef = (SColRef *)((char *)pTableMeta + metaSize + schemaExtSize);
×
UNCOV
692
    memcpy(pTableMeta->colRef, msg->pColRefs, pColRefSize);
×
693
  } else {
694
    pTableMeta->colRef = NULL;
1,230✔
695
  }
696

697
  bool hasPK = (msg->numOfColumns > 1) && (pTableMeta->schema[1].flags & COL_IS_KEY);
1,230!
698
  for (int32_t i = 0; i < msg->numOfColumns; ++i) {
4,000✔
699
    pTableMeta->tableInfo.rowSize += pTableMeta->schema[i].bytes;
2,770✔
700
    if (hasPK && (i > 0)) {
2,770!
UNCOV
701
      if ((pTableMeta->schema[i].flags & COL_IS_KEY)) {
×
UNCOV
702
        ++pTableMeta->tableInfo.numOfPKs;
×
703
      } else {
UNCOV
704
        hasPK = false;
×
705
      }
706
    }
707
  }
708

709
  char *pTbName = (char *)pTableMeta + metaSize + schemaExtSize + pColRefSize;
1,230✔
710
  tstrncpy(pTbName, msg->tbName, tbNameSize);
1,230✔
711

712
  qDebug("tb:%s, uid:%" PRIx64 " meta returned, type:%d vgId:%d db:%s stb:%s suid:%" PRIx64
1,230!
713
         " sver:%d tver:%d"
714
         " tagNum:%d colNum:%d precision:%d rowSize:%d",
715
         msg->tbName, pTableMeta->uid, pTableMeta->tableType, pTableMeta->vgId, msg->dbFName, msg->stbName,
716
         pTableMeta->suid, pTableMeta->sversion, pTableMeta->tversion, pTableMeta->tableInfo.numOfTags,
717
         pTableMeta->tableInfo.numOfColumns, pTableMeta->tableInfo.precision, pTableMeta->tableInfo.rowSize);
718

719
  *pMeta = pTableMeta;
1,230✔
720
  return TSDB_CODE_SUCCESS;
1,230✔
721
}
722

723
int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
81,745✔
724
  int32_t       code = 0;
81,745✔
725
  STableMetaRsp metaRsp = {0};
81,745✔
726

727
  if (NULL == output || NULL == msg || msgSize <= 0) {
81,745!
728
    qError("queryProcessTableMetaRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
1!
UNCOV
729
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
730
    goto PROCESS_META_OVER;
×
731
  }
732

733
  if (tDeserializeSTableMetaRsp(msg, msgSize, &metaRsp) != 0) {
81,744!
734
    code = TSDB_CODE_INVALID_MSG;
×
735
    goto PROCESS_META_OVER;
×
736
  }
737

738
  code = queryConvertTableMetaMsg(&metaRsp);
81,742✔
739
  if (code != TSDB_CODE_SUCCESS) {
81,734!
740
    goto PROCESS_META_OVER;
×
741
  }
742

743
  if (!IS_SYS_DBNAME(metaRsp.dbFName) &&
81,734!
744
      !tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) {
79,816!
745
    code = TSDB_CODE_TSC_INVALID_VALUE;
×
UNCOV
746
    goto PROCESS_META_OVER;
×
747
  }
748

749
  STableMetaOutput *pOut = output;
81,738✔
750
  tstrncpy(pOut->dbFName, metaRsp.dbFName, TSDB_DB_FNAME_LEN);
81,738✔
751
  pOut->dbId = metaRsp.dbId;
81,738✔
752

753
  if (metaRsp.tableType == TSDB_CHILD_TABLE) {
81,738✔
754
    SET_META_TYPE_BOTH_TABLE(pOut->metaType);
48,065✔
755

756
    tstrncpy(pOut->ctbName, metaRsp.tbName, TSDB_TABLE_NAME_LEN);
48,065✔
757
    tstrncpy(pOut->tbName, metaRsp.stbName, TSDB_TABLE_NAME_LEN);
48,065✔
758

759
    pOut->ctbMeta.vgId = metaRsp.vgId;
48,065✔
760
    pOut->ctbMeta.tableType = metaRsp.tableType;
48,065✔
761
    pOut->ctbMeta.uid = metaRsp.tuid;
48,065✔
762
    pOut->ctbMeta.suid = metaRsp.suid;
48,065✔
763

764
    code = queryCreateTableMetaFromMsg(&metaRsp, true, &pOut->tbMeta);
48,065✔
765
  } else if (metaRsp.tableType == TSDB_VIRTUAL_CHILD_TABLE) {
33,673✔
766
    SET_META_TYPE_BOTH_VTABLE(pOut->metaType);
20✔
767

768
    tstrncpy(pOut->ctbName, metaRsp.tbName, TSDB_TABLE_NAME_LEN);
20✔
769
    tstrncpy(pOut->tbName, metaRsp.stbName, TSDB_TABLE_NAME_LEN);
20✔
770

771
    code = queryCreateVCTableMetaFromMsg(&metaRsp, &pOut->vctbMeta);
20✔
772
    if (TSDB_CODE_SUCCESS != code) {
20!
UNCOV
773
      goto PROCESS_META_OVER;
×
774
    }
775
    code = queryCreateTableMetaFromMsg(&metaRsp, true, &pOut->tbMeta);
20✔
776
  } else {
777
    SET_META_TYPE_TABLE(pOut->metaType);
33,653✔
778
    tstrncpy(pOut->tbName, metaRsp.tbName, TSDB_TABLE_NAME_LEN);
33,653✔
779
    code = queryCreateTableMetaFromMsg(&metaRsp, (metaRsp.tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
33,653✔
780
  }
781

782
PROCESS_META_OVER:
81,740✔
783
  if (code != 0) {
81,740!
UNCOV
784
    qError("failed to process table meta rsp since %s", tstrerror(code));
×
785
  }
786

787
  tFreeSTableMetaRsp(&metaRsp);
81,740✔
788
  return code;
81,739✔
789
}
790

791
static int32_t queryProcessTableNameRsp(void *output, char *msg, int32_t msgSize) {
1,230✔
792
  int32_t       code = 0;
1,230✔
793
  STableMetaRsp metaRsp = {0};
1,230✔
794

795
  if (NULL == output || NULL == msg || msgSize <= 0) {
1,230!
UNCOV
796
    qError("queryProcessTableNameRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
797
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
798
    goto PROCESS_NAME_OVER;
×
799
  }
800

801
  if (tDeserializeSTableMetaRsp(msg, msgSize, &metaRsp) != 0) {
1,230!
UNCOV
802
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
803
    goto PROCESS_NAME_OVER;
×
804
  }
805

806
  code = queryConvertTableMetaMsg(&metaRsp);
1,230✔
807
  if (code != TSDB_CODE_SUCCESS) {
1,230!
UNCOV
808
    goto PROCESS_NAME_OVER;
×
809
  }
810

811
  if (!IS_SYS_DBNAME(metaRsp.dbFName) &&
1,230!
812
      !tIsValidSchema(metaRsp.pSchemas, metaRsp.numOfColumns, metaRsp.numOfTags)) {
1,230!
UNCOV
813
    code = TSDB_CODE_TSC_INVALID_VALUE;
×
UNCOV
814
    goto PROCESS_NAME_OVER;
×
815
  }
816

817
  STableMetaOutput *pOut = output;
1,230✔
818
  tstrncpy(pOut->dbFName, metaRsp.dbFName, TSDB_DB_FNAME_LEN);
1,230✔
819
  pOut->dbId = metaRsp.dbId;
1,230✔
820

821
  if (metaRsp.tableType == TSDB_CHILD_TABLE) {
1,230✔
822
    SET_META_TYPE_BOTH_TABLE(pOut->metaType);
331✔
823

824
    tstrncpy(pOut->ctbName, metaRsp.tbName, TSDB_TABLE_NAME_LEN);
331✔
825
    tstrncpy(pOut->tbName, metaRsp.stbName, TSDB_TABLE_NAME_LEN);
331✔
826

827
    pOut->ctbMeta.vgId = metaRsp.vgId;
331✔
828
    pOut->ctbMeta.tableType = metaRsp.tableType;
331✔
829
    pOut->ctbMeta.uid = metaRsp.tuid;
331✔
830
    pOut->ctbMeta.suid = metaRsp.suid;
331✔
831

832
    code = queryCreateTableMetaExFromMsg(&metaRsp, true, &pOut->tbMeta);
331✔
833
  } else if (metaRsp.tableType == TSDB_VIRTUAL_CHILD_TABLE) {
899!
UNCOV
834
    SET_META_TYPE_BOTH_VTABLE(pOut->metaType);
×
835

836
    tstrncpy(pOut->ctbName, metaRsp.tbName, TSDB_TABLE_NAME_LEN);
×
837
    tstrncpy(pOut->tbName, metaRsp.stbName, TSDB_TABLE_NAME_LEN);
×
838

839
    code = queryCreateVCTableMetaFromMsg(&metaRsp, &pOut->vctbMeta);
×
UNCOV
840
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
841
      goto PROCESS_NAME_OVER;
×
842
    }
843

844
    code = queryCreateTableMetaExFromMsg(&metaRsp, true, &pOut->tbMeta);
×
845
  } else {
846
    SET_META_TYPE_TABLE(pOut->metaType);
899✔
847
    tstrncpy(pOut->tbName, metaRsp.tbName, TSDB_TABLE_NAME_LEN);
899✔
848
    code = queryCreateTableMetaExFromMsg(&metaRsp, (metaRsp.tableType == TSDB_SUPER_TABLE), &pOut->tbMeta);
899✔
849
  }
850

851
PROCESS_NAME_OVER:
1,230✔
852
  if (code != 0) {
1,230!
UNCOV
853
    qError("failed to process table name rsp since %s", tstrerror(code));
×
854
  }
855

856
  tFreeSTableMetaRsp(&metaRsp);
1,230✔
857
  return code;
1,230✔
858
}
859

860
int32_t queryProcessQnodeListRsp(void *output, char *msg, int32_t msgSize) {
10,557✔
861
  SQnodeListRsp out = {0};
10,557✔
862
  int32_t       code = 0;
10,557✔
863

864
  if (NULL == output || NULL == msg || msgSize <= 0) {
10,557!
UNCOV
865
    qError("queryProcessQnodeListRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
UNCOV
866
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
867
    return code;
×
868
  }
869

870
  out.qnodeList = (SArray *)output;
10,557✔
871
  if (tDeserializeSQnodeListRsp(msg, msgSize, &out) != 0) {
10,557!
872
    qError("invalid qnode list rsp msg, msgSize:%d", msgSize);
×
873
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
874
    return code;
×
875
  }
876

877
  return code;
10,557✔
878
}
879

880
int32_t queryProcessDnodeListRsp(void *output, char *msg, int32_t msgSize) {
345✔
881
  SDnodeListRsp out = {0};
345✔
882
  int32_t       code = 0;
345✔
883

884
  if (NULL == output || NULL == msg || msgSize <= 0) {
345!
UNCOV
885
    qError("queryProcessDnodeListRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
UNCOV
886
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
887
    return code;
×
888
  }
889

890
  if (tDeserializeSDnodeListRsp(msg, msgSize, &out) != 0) {
345!
891
    qError("invalid dnode list rsp msg, msgSize:%d", msgSize);
×
UNCOV
892
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
893
    return code;
×
894
  }
895

896
  *(SArray **)output = out.dnodeList;
345✔
897

898
  return code;
345✔
899
}
900

901
int32_t queryProcessGetSerVerRsp(void *output, char *msg, int32_t msgSize) {
1✔
902
  SServerVerRsp out = {0};
1✔
903
  int32_t       code = 0;
1✔
904

905
  if (NULL == output || NULL == msg || msgSize <= 0) {
1!
UNCOV
906
    qError("queryProcessGetSerVerRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
UNCOV
907
    code = TSDB_CODE_TSC_INVALID_INPUT;
×
UNCOV
908
    return code;
×
909
  }
910

911
  if (tDeserializeSServerVerRsp(msg, msgSize, &out) != 0) {
1!
UNCOV
912
    qError("invalid svr ver rsp msg, msgSize:%d", msgSize);
×
UNCOV
913
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
914
    return code;
×
915
  }
916

917
  *(char **)output = taosStrdup(out.ver);
1!
918
  if (NULL == *(char **)output) {
1!
UNCOV
919
    return terrno;
×
920
  }
921

922
  return code;
1✔
923
}
924

925
int32_t queryProcessGetDbCfgRsp(void *output, char *msg, int32_t msgSize) {
3,906✔
926
  SDbCfgRsp out = {0};
3,906✔
927

928
  if (NULL == output || NULL == msg || msgSize <= 0) {
3,906!
929
    qError("queryProcessGetDbCfgRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
930
    return TSDB_CODE_TSC_INVALID_INPUT;
×
931
  }
932

933
  if (tDeserializeSDbCfgRsp(msg, msgSize, &out) != 0) {
3,906!
UNCOV
934
    qError("tDeserializeSDbCfgRsp failed, msgSize:%d, dbCfgRsp:%lu", msgSize, sizeof(out));
×
935
    return TSDB_CODE_INVALID_MSG;
×
936
  }
937

938
  memcpy(output, &out, sizeof(out));
3,906✔
939

940
  return TSDB_CODE_SUCCESS;
3,906✔
941
}
942

UNCOV
943
int32_t queryProcessGetIndexRsp(void *output, char *msg, int32_t msgSize) {
×
944
  SUserIndexRsp out = {0};
×
945

UNCOV
946
  if (NULL == output || NULL == msg || msgSize <= 0) {
×
UNCOV
947
    qError("queryProcessGetIndexRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
UNCOV
948
    return TSDB_CODE_TSC_INVALID_INPUT;
×
949
  }
950

UNCOV
951
  if (tDeserializeSUserIndexRsp(msg, msgSize, &out) != 0) {
×
UNCOV
952
    qError("tDeserializeSUserIndexRsp failed, msgSize:%d", msgSize);
×
953
    return TSDB_CODE_INVALID_MSG;
×
954
  }
955

956
  memcpy(output, &out, sizeof(out));
×
957

UNCOV
958
  return TSDB_CODE_SUCCESS;
×
959
}
960

961
int32_t queryProcessRetrieveFuncRsp(void *output, char *msg, int32_t msgSize) {
87✔
962
  SRetrieveFuncRsp out = {0};
87✔
963

964
  if (NULL == output || NULL == msg || msgSize <= 0) {
87!
UNCOV
965
    qError("queryProcessRetrieveFuncRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
966
    return TSDB_CODE_TSC_INVALID_INPUT;
×
967
  }
968

969
  if (tDeserializeSRetrieveFuncRsp(msg, msgSize, &out) != 0) {
87!
UNCOV
970
    qError("tDeserializeSRetrieveFuncRsp failed, msgSize:%d", msgSize);
×
UNCOV
971
    return TSDB_CODE_INVALID_MSG;
×
972
  }
973

974
  if (1 != out.numOfFuncs) {
87!
975
    qError("invalid func num returned, numOfFuncs:%d", out.numOfFuncs);
×
976
    return TSDB_CODE_INVALID_MSG;
×
977
  }
978

979
  SFuncInfo *funcInfo = taosArrayGet(out.pFuncInfos, 0);
87✔
980

981
  memcpy(output, funcInfo, sizeof(*funcInfo));
87✔
982
  taosArrayDestroy(out.pFuncInfos);
87✔
983
  taosArrayDestroy(out.pFuncExtraInfos);
87✔
984

985
  return TSDB_CODE_SUCCESS;
87✔
986
}
987

988
int32_t queryProcessGetUserAuthRsp(void *output, char *msg, int32_t msgSize) {
20,295✔
989
  if (NULL == output || NULL == msg || msgSize <= 0) {
20,295!
990
    qError("queryProcessGetUserAuthRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
991
    return TSDB_CODE_TSC_INVALID_INPUT;
×
992
  }
993

994
  if (tDeserializeSGetUserAuthRsp(msg, msgSize, (SGetUserAuthRsp *)output) != 0) {
20,295✔
995
    qError("tDeserializeSGetUserAuthRsp failed, msgSize:%d", msgSize);
1!
996
    return TSDB_CODE_INVALID_MSG;
×
997
  }
998

999
  return TSDB_CODE_SUCCESS;
20,294✔
1000
}
1001

1002
int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
1✔
1003
  if (NULL == output || NULL == msg || msgSize <= 0) {
1!
1004
    qError("queryProcessGetTbIndexRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
1005
    return TSDB_CODE_TSC_INVALID_INPUT;
×
1006
  }
1007

1008
  STableIndexRsp *out = (STableIndexRsp *)output;
1✔
1009
  if (tDeserializeSTableIndexRsp(msg, msgSize, out) != 0) {
1!
1010
    qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
×
UNCOV
1011
    return TSDB_CODE_INVALID_MSG;
×
1012
  }
1013

1014
  return TSDB_CODE_SUCCESS;
1✔
1015
}
1016

1017
int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
195✔
1018
  if (NULL == output || NULL == msg || msgSize <= 0) {
195!
UNCOV
1019
    qError("queryProcessGetTbCfgRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
UNCOV
1020
    return TSDB_CODE_TSC_INVALID_INPUT;
×
1021
  }
1022

1023
  STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp));
195!
1024
  if(out == NULL) {
195!
UNCOV
1025
    return terrno;
×
1026
  }
1027
  if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) {
195!
UNCOV
1028
    qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize);
×
UNCOV
1029
    tFreeSTableCfgRsp(out);
×
UNCOV
1030
    taosMemoryFree(out);
×
UNCOV
1031
    return TSDB_CODE_INVALID_MSG;
×
1032
  }
1033

1034
  *(STableCfgRsp **)output = out;
195✔
1035

1036
  return TSDB_CODE_SUCCESS;
195✔
1037
}
1038

1039
int32_t queryProcessGetViewMetaRsp(void *output, char *msg, int32_t msgSize) {
311✔
1040
  if (NULL == output || NULL == msg || msgSize <= 0) {
311!
UNCOV
1041
    qError("queryProcessGetViewMetaRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
UNCOV
1042
    return TSDB_CODE_TSC_INVALID_INPUT;
×
1043
  }
1044

1045
  SViewMetaRsp *out = taosMemoryCalloc(1, sizeof(SViewMetaRsp));
311!
1046
  if (out == NULL) {
311!
UNCOV
1047
    return terrno;
×
1048
  }
1049
  if (tDeserializeSViewMetaRsp(msg, msgSize, out) != 0) {
311!
UNCOV
1050
    qError("tDeserializeSViewMetaRsp failed, msgSize:%d", msgSize);
×
UNCOV
1051
    tFreeSViewMetaRsp(out);
×
UNCOV
1052
    taosMemoryFree(out);
×
UNCOV
1053
    return TSDB_CODE_INVALID_MSG;
×
1054
  }
1055

1056
  qDebugL("view meta recved, dbFName:%s, view:%s, querySQL:%s", out->dbFName, out->name, out->querySql);
311✔
1057

1058
  *(SViewMetaRsp **)output = out;
311✔
1059

1060
  return TSDB_CODE_SUCCESS;
311✔
1061
}
1062

1063
int32_t queryProcessGetTbTSMARsp(void* output, char* msg, int32_t msgSize) {
934✔
1064
  if (NULL == output || NULL == msg || msgSize <= 0) {
934!
UNCOV
1065
    qError("queryProcessGetTbTSMARsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
UNCOV
1066
    return TSDB_CODE_TSC_INVALID_INPUT;
×
1067
  }
1068

1069
  if (tDeserializeTableTSMAInfoRsp(msg, msgSize, output) != 0) {
934!
UNCOV
1070
    qError("tDeserializeSViewMetaRsp failed, msgSize:%d", msgSize);
×
UNCOV
1071
    return TSDB_CODE_INVALID_MSG;
×
1072
  }
1073

1074
  return TSDB_CODE_SUCCESS;
934✔
1075
}
1076

1077
int32_t queryProcessStreamProgressRsp(void* output, char* msg, int32_t msgSize) {
4,868✔
1078
  if (!output || !msg || msgSize <= 0) {
4,868!
UNCOV
1079
    qError("queryProcessStreamProgressRsp: invalid input param, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
UNCOV
1080
    return TSDB_CODE_TSC_INVALID_INPUT;
×
1081
  }
1082

1083
  if (tDeserializeSStreamProgressRsp(msg, msgSize, output) != 0) {
4,868!
UNCOV
1084
    qError("tDeserializeStreamProgressRsp failed, msgSize:%d", msgSize);
×
UNCOV
1085
    return TSDB_CODE_INVALID_MSG;
×
1086
  }
1087
  return TSDB_CODE_SUCCESS;
4,868✔
1088
}
1089

NEW
1090
int32_t queryProcessVSubTablesRsp(void* output, char* msg, int32_t msgSize) {
×
NEW
1091
  if (!output || !msg || msgSize <= 0) {
×
NEW
1092
    qError("queryProcessVSubTablesRsp input error, output:%p, msg:%p, msgSize:%d", output, msg, msgSize);
×
NEW
1093
    return TSDB_CODE_TSC_INVALID_INPUT;
×
1094
  }
1095

NEW
1096
  SVSubTablesRsp* pRsp = (SVSubTablesRsp*)output;
×
NEW
1097
  int32_t code = tDeserializeSVSubTablesRsp(msg, msgSize, pRsp);
×
NEW
1098
  if (code != 0) {
×
NEW
1099
    qError("tDeserializeSVSubTablesRsp failed, msgSize: %d, error:%d", msgSize, code);
×
NEW
1100
    return code;
×
1101
  }
1102
  
NEW
1103
  return TSDB_CODE_SUCCESS;
×
1104
}
1105

1106
void initQueryModuleMsgHandle() {
15,748✔
1107
  queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
15,748✔
1108
  queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_NAME)] = queryBuildTableMetaReqMsg;
15,748✔
1109
  queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
15,748✔
1110
  queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)] = queryBuildUseDbMsg;
15,748✔
1111
  queryBuildMsg[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryBuildQnodeListMsg;
15,748✔
1112
  queryBuildMsg[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryBuildDnodeListMsg;
15,748✔
1113
  queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryBuildGetDBCfgMsg;
15,748✔
1114
  queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryBuildGetIndexMsg;
15,748✔
1115
  queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
15,748✔
1116
  queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg;
15,748✔
1117
  queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg;
15,748✔
1118
  queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
15,748✔
1119
  queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
15,748✔
1120
  queryBuildMsg[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryBuildGetSerVerMsg;
15,748✔
1121
  queryBuildMsg[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryBuildGetViewMetaMsg;
15,748✔
1122
  queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryBuildGetTableTSMAMsg;
15,748✔
1123
  queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryBuildGetTSMAMsg;
15,748✔
1124
  queryBuildMsg[TMSG_INDEX(TDMT_VND_GET_STREAM_PROGRESS)] = queryBuildGetStreamProgressMsg;
15,748✔
1125
  queryBuildMsg[TMSG_INDEX(TDMT_VND_VSUBTABLES_META)] = queryBuildVSubTablesMsg;
15,748✔
1126

1127
  queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
15,748✔
1128
  queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_NAME)] = queryProcessTableNameRsp;
15,748✔
1129
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
15,748✔
1130
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_USE_DB)] = queryProcessUseDBRsp;
15,748✔
1131
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_QNODE_LIST)] = queryProcessQnodeListRsp;
15,748✔
1132
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_DNODE_LIST)] = queryProcessDnodeListRsp;
15,748✔
1133
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_DB_CFG)] = queryProcessGetDbCfgRsp;
15,748✔
1134
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_INDEX)] = queryProcessGetIndexRsp;
15,748✔
1135
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
15,748✔
1136
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp;
15,748✔
1137
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
15,748✔
1138
  queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
15,748✔
1139
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
15,748✔
1140
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_SERVER_VERSION)] = queryProcessGetSerVerRsp;
15,748✔
1141
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryProcessGetViewMetaRsp;
15,748✔
1142
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryProcessGetTbTSMARsp;
15,748✔
1143
  queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryProcessGetTbTSMARsp;
15,748✔
1144
  queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_GET_STREAM_PROGRESS)] = queryProcessStreamProgressRsp;
15,748✔
1145
  queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_VSUBTABLES_META)] = queryProcessVSubTablesRsp;
15,748✔
1146
}
15,748✔
1147

1148
#pragma GCC diagnostic pop
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc