• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.23
/source/dnode/mnode/impl/src/mndShow.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndShow.h"
18
#include "mndPrivilege.h"
19
#include "mndUser.h"
20
#include "systable.h"
21

22
#define SHOW_STEP_SIZE            100
23
#define SHOW_COLS_STEP_SIZE       4096
24
#define SHOW_PRIVILEGES_STEP_SIZE 2048
25

26
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq);
27
static void      mndFreeShowObj(SShowObj *pShow);
28
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
29
static void      mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
30
static bool      mndCheckRetrieveFinished(SShowObj *pShow);
31
static int32_t   mndProcessRetrieveSysTableReq(SRpcMsg *pReq);
32

33
int32_t mndInitShow(SMnode *pMnode) {
1,748✔
34
  int32_t    code = 0;
1,748✔
35
  SShowMgmt *pMgmt = &pMnode->showMgmt;
1,748✔
36

37
  pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, 5000, true, (__cache_free_fn_t)mndFreeShowObj, "show");
1,748✔
38
  if (pMgmt->cache == NULL) {
1,748!
39
    code = TSDB_CODE_OUT_OF_MEMORY;
×
40
    mError("failed to alloc show cache since %s", tstrerror(code));
×
41
    TAOS_RETURN(code);
×
42
  }
43

44
  mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveSysTableReq);
1,748✔
45
  TAOS_RETURN(code);
1,748✔
46
}
47

48
void mndCleanupShow(SMnode *pMnode) {
1,747✔
49
  SShowMgmt *pMgmt = &pMnode->showMgmt;
1,747✔
50
  if (pMgmt->cache != NULL) {
1,747!
51
    taosCacheCleanup(pMgmt->cache);
1,747✔
52
    pMgmt->cache = NULL;
1,747✔
53
  }
54
}
1,747✔
55

56
static int32_t convertToRetrieveType(char *name, int32_t len) {
228,018✔
57
  int32_t type = -1;
228,018✔
58

59
  if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) {
228,018✔
60
    type = TSDB_MGMT_TABLE_DNODE;
7,308✔
61
  } else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, len) == 0) {
220,710✔
62
    type = TSDB_MGMT_TABLE_MNODE;
4,510✔
63
    /*
64
      } else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, len) == 0) {
65
        type = TSDB_MGMT_TABLE_MODULE;
66
    */
67
  } else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, len) == 0) {
216,200✔
68
    type = TSDB_MGMT_TABLE_QNODE;
2,186✔
69
  } else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) {
214,014✔
70
    type = TSDB_MGMT_TABLE_SNODE;
2,184✔
71
  } else if (strncasecmp(name, TSDB_INS_TABLE_ANODES, len) == 0) {
211,830✔
72
    type = TSDB_MGMT_TABLE_ANODE;
3✔
73
  } else if (strncasecmp(name, TSDB_INS_TABLE_ANODES_FULL, len) == 0) {
211,827✔
74
    type = TSDB_MGMT_TABLE_ANODE_FULL;
1✔
75
  } else if (strncasecmp(name, TSDB_INS_TABLE_ARBGROUPS, len) == 0) {
211,826!
76
    type = TSDB_MGMT_TABLE_ARBGROUP;
×
77
  } else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) {
211,826✔
78
    type = TSDB_MGMT_TABLE_CLUSTER;
4,325✔
79
  } else if (strncasecmp(name, TSDB_INS_TABLE_DATABASES, len) == 0) {
207,501✔
80
    type = TSDB_MGMT_TABLE_DB;
27,624✔
81
  } else if (strncasecmp(name, TSDB_INS_TABLE_FUNCTIONS, len) == 0) {
179,877✔
82
    type = TSDB_MGMT_TABLE_FUNC;
7,246✔
83
  } else if (strncasecmp(name, TSDB_INS_TABLE_INDEXES, len) == 0) {
172,631✔
84
    type = TSDB_MGMT_TABLE_INDEX;
6,755✔
85
  } else if (strncasecmp(name, TSDB_INS_TABLE_STABLES, len) == 0) {
165,876✔
86
    type = TSDB_MGMT_TABLE_STB;
9,150✔
87
  } else if (strncasecmp(name, TSDB_INS_TABLE_TABLES, len) == 0) {
156,726!
88
    type = TSDB_MGMT_TABLE_TABLE;
×
89
  } else if (strncasecmp(name, TSDB_INS_TABLE_TAGS, len) == 0) {
156,726!
90
    type = TSDB_MGMT_TABLE_TAG;
×
91
  } else if (strncasecmp(name, TSDB_INS_TABLE_COLS, len) == 0) {
156,726✔
92
    type = TSDB_MGMT_TABLE_COL;
14,016✔
93
  } else if (strncasecmp(name, TSDB_INS_TABLE_TABLE_DISTRIBUTED, len) == 0) {
142,710✔
94
    //    type = TSDB_MGMT_TABLE_DIST;
95
  } else if (strncasecmp(name, TSDB_INS_TABLE_USERS, len) == 0) {
142,646✔
96
    type = TSDB_MGMT_TABLE_USER;
5,070✔
97
  } else if (strncasecmp(name, TSDB_INS_TABLE_USERS_FULL, len) == 0) {
137,576!
98
    type = TSDB_MGMT_TABLE_USER_FULL;
×
99
  } else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, len) == 0) {
137,576✔
100
    type = TSDB_MGMT_TABLE_GRANTS;
5,759✔
101
  } else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, len) == 0) {
131,817✔
102
    type = TSDB_MGMT_TABLE_VGROUP;
13,516✔
103
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_CONSUMERS, len) == 0) {
118,301✔
104
    type = TSDB_MGMT_TABLE_CONSUMERS;
8,007✔
105
  } else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIPTIONS, len) == 0) {
110,294✔
106
    type = TSDB_MGMT_TABLE_SUBSCRIPTIONS;
5,844✔
107
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_TRANS, len) == 0) {
104,450✔
108
    type = TSDB_MGMT_TABLE_TRANS;
7,470✔
109
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_SMAS, len) == 0) {
96,980!
110
    type = TSDB_MGMT_TABLE_SMAS;
×
111
  } else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, len) == 0) {
96,980✔
112
    type = TSDB_MGMT_TABLE_CONFIGS;
1,446✔
113
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_CONNECTIONS, len) == 0) {
95,534✔
114
    type = TSDB_MGMT_TABLE_CONNS;
6,516✔
115
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_QUERIES, len) == 0) {
89,018✔
116
    type = TSDB_MGMT_TABLE_QUERIES;
11,534✔
117
  } else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) {
77,484✔
118
    type = TSDB_MGMT_TABLE_VNODES;
9,094✔
119
  } else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, len) == 0) {
68,390✔
120
    type = TSDB_MGMT_TABLE_TOPICS;
5,075✔
121
  } else if (strncasecmp(name, TSDB_INS_TABLE_STREAMS, len) == 0) {
63,315✔
122
    type = TSDB_MGMT_TABLE_STREAMS;
11,988✔
123
  } else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) {
51,327✔
124
    type = TSDB_MGMT_TABLE_APPS;
10,806✔
125
  } else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) {
40,521✔
126
    type = TSDB_MGMT_TABLE_STREAM_TASKS;
21,588✔
127
  } else if (strncasecmp(name, TSDB_INS_TABLE_USER_PRIVILEGES, len) == 0) {
18,933✔
128
    type = TSDB_MGMT_TABLE_PRIVILEGES;
4,341✔
129
  } else if (strncasecmp(name, TSDB_INS_TABLE_VIEWS, len) == 0) {
14,592✔
130
    type = TSDB_MGMT_TABLE_VIEWS;
6,537✔
131
  } else if (strncasecmp(name, TSDB_INS_TABLE_COMPACTS, len) == 0) {
8,055✔
132
    type = TSDB_MGMT_TABLE_COMPACT;
2,239✔
133
  } else if (strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, len) == 0) {
5,816✔
134
    type = TSDB_MGMT_TABLE_COMPACT_DETAIL;
5,759✔
135
  } else if (strncasecmp(name, TSDB_INS_TABLE_TRANSACTION_DETAILS, len) == 0) {
57!
136
    type = TSDB_MGMT_TABLE_TRANSACTION_DETAIL;
×
137
  } else if (strncasecmp(name, TSDB_INS_TABLE_GRANTS_FULL, len) == 0) {
57✔
138
    type = TSDB_MGMT_TABLE_GRANTS_FULL;
6✔
139
  } else if (strncasecmp(name, TSDB_INS_TABLE_GRANTS_LOGS, len) == 0) {
51✔
140
    type = TSDB_MGMT_TABLE_GRANTS_LOGS;
6✔
141
  } else if (strncasecmp(name, TSDB_INS_TABLE_MACHINES, len) == 0) {
45✔
142
    type = TSDB_MGMT_TABLE_MACHINES;
6✔
143
  } else if (strncasecmp(name, TSDB_INS_TABLE_ENCRYPTIONS, len) == 0) {
39!
144
    type = TSDB_MGMT_TABLE_ENCRYPTIONS;
×
145
  } else if (strncasecmp(name, TSDB_INS_TABLE_TSMAS, len) == 0) {
39✔
146
    type = TSDB_MGMT_TABLE_TSMAS;
6✔
147
  } else if (strncasecmp(name, TSDB_INS_DISK_USAGE, len) == 0) {
33!
148
    type = TSDB_MGMT_TABLE_USAGE;
×
149
  } else if (strncasecmp(name, TSDB_INS_TABLE_FILESETS, len) == 0) {
33!
150
    type = TSDB_MGMT_TABLE_FILESETS;
×
151
  } else {
152
    mError("invalid show name:%s len:%d", name, len);
33!
153
  }
154

155
  return type;
227,927✔
156
}
157

158
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
227,978✔
159
  SShowMgmt *pMgmt = &pMnode->showMgmt;
227,978✔
160

161
  int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1);
227,978✔
162
  if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1);
228,076!
163

164
  int32_t size = sizeof(SShowObj);
228,076✔
165

166
  SShowObj showObj = {0};
228,076✔
167

168
  showObj.id = showId;
228,076✔
169
  showObj.pMnode = pMnode;
228,076✔
170
  showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
228,076✔
171
  (void)memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
227,954✔
172
  tstrncpy(showObj.filterTb, pReq->filterTb, TSDB_TABLE_NAME_LEN);
227,954✔
173

174
  int32_t   keepTime = tsShellActivityTimer * 6 * 1000;
227,954✔
175
  SShowObj *pShow = taosCachePut(pMgmt->cache, &showId, sizeof(int64_t), &showObj, size, keepTime);
227,954✔
176
  if (pShow == NULL) {
228,028!
177
    terrno = TSDB_CODE_OUT_OF_MEMORY;
×
178
    mError("show:0x%" PRIx64 ", failed to put into cache since %s", showId, terrstr());
×
179
    return NULL;
×
180
  }
181

182
  mTrace("show:0x%" PRIx64 ", is created, data:%p", showId, pShow);
228,028✔
183
  return pShow;
228,031✔
184
}
185

186
static void mndFreeShowObj(SShowObj *pShow) {
228,142✔
187
  SMnode    *pMnode = pShow->pMnode;
228,142✔
188
  SShowMgmt *pMgmt = &pMnode->showMgmt;
228,142✔
189

190
  ShowFreeIterFp freeFp = pMgmt->freeIterFps[pShow->type];
228,142✔
191
  if (freeFp != NULL) {
228,142✔
192
    if (pShow->pIter != NULL) {
214,285!
193
      mTrace("show:0x%" PRIx64 ", is destroying, data:%p, pIter:%p, ", pShow->id, pShow, pShow->pIter);
×
194

195
      (*freeFp)(pMnode, pShow->pIter);
×
196

197
      pShow->pIter = NULL;
×
198
    }
199
  }
200

201
  mTrace("show:0x%" PRIx64 ", is destroyed, data:%p", pShow->id, pShow);
228,142✔
202
}
228,142✔
203

204
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId) {
8,300✔
205
  SShowMgmt *pMgmt = &pMnode->showMgmt;
8,300✔
206

207
  SShowObj *pShow = taosCacheAcquireByKey(pMgmt->cache, &showId, sizeof(showId));
8,300✔
208
  if (pShow == NULL) {
8,301!
209
    mError("show:0x%" PRIx64 ", already destroyed", showId);
×
210
    return NULL;
×
211
  }
212

213
  mTrace("show:0x%" PRIx64 ", acquired from cache, data:%p", pShow->id, pShow);
8,301!
214
  return pShow;
8,300✔
215
}
216

217
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove) {
236,255✔
218
  if (pShow == NULL) return;
236,255!
219
  mTrace("show:0x%" PRIx64 ", released from cache, data:%p force:%d", pShow->id, pShow, forceRemove);
236,255✔
220

221
  // A bug in tcache.c
222
  forceRemove = 0;
236,255✔
223

224
  SMnode    *pMnode = pShow->pMnode;
236,255✔
225
  SShowMgmt *pMgmt = &pMnode->showMgmt;
236,255✔
226
  taosCacheRelease(pMgmt->cache, (void **)(&pShow), forceRemove);
236,255✔
227
}
228

229
static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
236,400✔
230
  int32_t    code = 0;
236,400✔
231
  SMnode    *pMnode = pReq->info.node;
236,400✔
232
  SShowMgmt *pMgmt = &pMnode->showMgmt;
236,400✔
233
  SShowObj  *pShow = NULL;
236,400✔
234
  int32_t    rowsToRead = SHOW_STEP_SIZE;
236,400✔
235
  int32_t    size = 0;
236,400✔
236
  int32_t    rowsRead = 0;
236,400✔
237
  mDebug("mndProcessRetrieveSysTableReq start");
236,400✔
238
  SRetrieveTableReq retrieveReq = {0};
236,400✔
239
  TAOS_CHECK_RETURN(tDeserializeSRetrieveTableReq(pReq->pCont, pReq->contLen, &retrieveReq));
236,400✔
240

241
  mDebug("process to retrieve systable req db:%s, tb:%s, compactId:%" PRId64, retrieveReq.db, retrieveReq.tb,
236,398✔
242
         retrieveReq.compactId);
243

244
  if (retrieveReq.showId == 0) {
236,351✔
245
    STableMetaRsp *pMeta = taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb));
228,052✔
246
    if (pMeta == NULL) {
228,048✔
247
      pMeta = taosHashGet(pMnode->perfsMeta, retrieveReq.tb, strlen(retrieveReq.tb));
44,360✔
248
      if (pMeta == NULL) {
44,357!
249
        code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
250
        mError("failed to process show-retrieve req:%p since %s", pShow, tstrerror(code));
×
251
        TAOS_RETURN(code);
×
252
      }
253
    }
254

255
    pShow = mndCreateShowObj(pMnode, &retrieveReq);
228,045✔
256
    if (pShow == NULL) {
228,014✔
257
      code = terrno;
52✔
258
      mError("failed to process show-meta req since %s", tstrerror(code));
×
259
      TAOS_RETURN(code);
×
260
    }
261

262
    pShow->pMeta = pMeta;
227,962✔
263
    pShow->numOfColumns = pShow->pMeta->numOfColumns;
227,962✔
264
  } else {
265
    pShow = mndAcquireShowObj(pMnode, retrieveReq.showId);
8,299✔
266
    if (pShow == NULL) {
8,300✔
267
      code = TSDB_CODE_MND_INVALID_SHOWOBJ;
5✔
268
      mError("failed to process show-retrieve req:%p since %s", pShow, tstrerror(code));
5!
269
      TAOS_RETURN(code);
5✔
270
    }
271
  }
272

273
  if (pShow->type == TSDB_MGMT_TABLE_COL) {  // expend capacity for ins_columns
236,257✔
274
    rowsToRead = SHOW_COLS_STEP_SIZE;
14,016✔
275
  } else if (pShow->type == TSDB_MGMT_TABLE_PRIVILEGES) {
222,241✔
276
    rowsToRead = SHOW_PRIVILEGES_STEP_SIZE;
4,344✔
277
  }
278
  ShowRetrieveFp retrieveFp = pMgmt->retrieveFps[pShow->type];
236,257✔
279
  if (retrieveFp == NULL) {
236,257!
280
    mndReleaseShowObj(pShow, false);
×
281
    code = TSDB_CODE_MSG_NOT_PROCESSED;
×
282
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
×
283
    TAOS_RETURN(code);
×
284
  }
285

286
  mDebug("show:0x%" PRIx64 ", start retrieve data, type:%d", pShow->id, pShow->type);
236,257✔
287
  if (retrieveReq.user[0] != 0) {
236,129✔
288
    (void)memcpy(pReq->info.conn.user, retrieveReq.user, TSDB_USER_LEN);
236,107✔
289
  } else {
290
    (void)memcpy(pReq->info.conn.user, TSDB_DEFAULT_USER, strlen(TSDB_DEFAULT_USER) + 1);
22✔
291
  }
292
  code = -1;
236,129✔
293
  if (retrieveReq.db[0] &&
241,364✔
294
      (code = mndCheckShowPrivilege(pMnode, pReq->info.conn.user, pShow->type, retrieveReq.db)) != 0) {
5,235✔
295
    TAOS_RETURN(code);
4✔
296
  }
297
  if (pShow->type == TSDB_MGMT_TABLE_USER_FULL) {
236,125!
298
    if (strcmp(pReq->info.conn.user, "root") != 0) {
×
299
      mError("The operation is not permitted, user:%s, pShow->type:%d", pReq->info.conn.user, pShow->type);
×
300
      code = TSDB_CODE_MND_NO_RIGHTS;
×
301
      TAOS_RETURN(code);
×
302
    }
303
  }
304

305
  int32_t numOfCols = pShow->pMeta->numOfColumns;
236,125✔
306

307
  SSDataBlock *pBlock = NULL;
236,125✔
308
  code = createDataBlock(&pBlock);
236,125✔
309
  if (code) {
236,237!
UNCOV
310
    TAOS_RETURN(code);
×
311
  }
312

313
  for (int32_t i = 0; i < numOfCols; ++i) {
3,777,673✔
314
    SColumnInfoData idata = {0};
3,541,085✔
315

316
    SSchema *p = &pShow->pMeta->pSchemas[i];
3,541,085✔
317

318
    idata.info.bytes = p->bytes;
3,541,085✔
319
    idata.info.type = p->type;
3,541,085✔
320
    idata.info.colId = p->colId;
3,541,085✔
321
    TAOS_CHECK_RETURN(blockDataAppendColInfo(pBlock, &idata));
3,541,085!
322
  }
323

324
  TAOS_CHECK_RETURN(blockDataEnsureCapacity(pBlock, rowsToRead));
236,588!
325

326
  if (mndCheckRetrieveFinished(pShow)) {
236,312!
327
    mDebug("show:0x%" PRIx64 ", read finished, numOfRows:%d", pShow->id, pShow->numOfRows);
×
328
    rowsRead = 0;
×
329
  } else {
330
    rowsRead = (*retrieveFp)(pReq, pShow, pBlock, rowsToRead);
236,318✔
331
    if (rowsRead < 0) {
236,388✔
332
      code = rowsRead;
3✔
333
      mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
3!
334
      mndReleaseShowObj(pShow, true);
3✔
335
      blockDataDestroy(pBlock);
3✔
336
      TAOS_RETURN(code);
3✔
337
    }
338

339
    pBlock->info.rows = rowsRead;
236,385✔
340
    mDebug("show:0x%" PRIx64 ", stop retrieve data, rowsRead:%d numOfRows:%d", pShow->id, rowsRead, pShow->numOfRows);
236,385✔
341
  }
342

343
  size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
236,385✔
344
  size = sizeof(SRetrieveMetaTableRsp) + sizeof(int32_t) + sizeof(SSysTableSchema) * pShow->pMeta->numOfColumns +
236,297✔
345
         dataEncodeBufSize;
346

347
  SRetrieveMetaTableRsp *pRsp = rpcMallocCont(size);
236,297✔
348
  if (pRsp == NULL) {
236,330!
349
    mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
×
350
    code = terrno;
×
351
    goto _exit;
×
352
  }
353

354
  pRsp->handle = htobe64(pShow->id);
236,330✔
355

356
  if (rowsRead > 0) {
236,294✔
357
    char    *pStart = pRsp->data;
188,341✔
358
    SSchema *ps = pShow->pMeta->pSchemas;
188,341✔
359

360
    *(int32_t *)pStart = htonl(pShow->pMeta->numOfColumns);
188,341✔
361
    pStart += sizeof(int32_t);  // number of columns
188,341✔
362

363
    for (int32_t i = 0; i < pShow->pMeta->numOfColumns; ++i) {
3,313,492✔
364
      SSysTableSchema *pSchema = (SSysTableSchema *)pStart;
3,125,151✔
365
      pSchema->bytes = htonl(ps[i].bytes);
3,125,151✔
366
      pSchema->colId = htons(ps[i].colId);
3,125,151✔
367
      pSchema->type = ps[i].type;
3,125,151✔
368

369
      pStart += sizeof(SSysTableSchema);
3,125,151✔
370
    }
371

372
    int32_t len = blockEncode(pBlock, pStart, dataEncodeBufSize, pShow->pMeta->numOfColumns);
188,341✔
373
    if (len < 0) {
188,374!
374
      mError("show:0x%" PRIx64 ", failed to retrieve data since %s", pShow->id, tstrerror(code));
×
375
      code = terrno;
×
376
      return code;
×
377
    }
378
  }
379

380
  pRsp->numOfRows = htonl(rowsRead);
236,327✔
381
  pRsp->precision = TSDB_TIME_PRECISION_MILLI;  // millisecond time precision
236,327✔
382
  pReq->info.rsp = pRsp;
236,327✔
383
  pReq->info.rspLen = size;
236,327✔
384

385
  if (rowsRead == 0 || mndCheckRetrieveFinished(pShow)) {
236,327✔
386
    pRsp->completed = 1;
228,051✔
387
    mDebug("show:0x%" PRIx64 ", retrieve completed", pShow->id);
228,051✔
388
    mndReleaseShowObj(pShow, true);
228,051✔
389
  } else {
390
    mDebug("show:0x%" PRIx64 ", retrieve not completed yet", pShow->id);
8,297✔
391
    mndReleaseShowObj(pShow, false);
8,297✔
392
  }
393

394
  blockDataDestroy(pBlock);
236,273✔
395
  return TSDB_CODE_SUCCESS;
236,419✔
396
_exit:
×
397
  mndReleaseShowObj(pShow, false);
×
398
  blockDataDestroy(pBlock);
×
399
  if (pRsp) {
×
400
    rpcFreeCont(pRsp);
×
401
  }
402
  return code;
×
403
}
404

405
static bool mndCheckRetrieveFinished(SShowObj *pShow) {
424,629✔
406
  if (pShow->pIter == NULL && pShow->numOfRows != 0) {
424,629✔
407
    return true;
180,035✔
408
  }
409
  return false;
244,594✔
410
}
411

412
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
66,424✔
413
  SShowMgmt *pMgmt = &pMnode->showMgmt;
66,424✔
414
  pMgmt->retrieveFps[showType] = fp;
66,424✔
415
}
66,424✔
416

417
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
61,180✔
418
  SShowMgmt *pMgmt = &pMnode->showMgmt;
61,180✔
419
  pMgmt->freeIterFps[showType] = fp;
61,180✔
420
}
61,180✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc