• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.0
/source/dnode/vnode/src/vnd/vnodeQuery.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "tsdb.h"
17
#include "vnd.h"
18

19
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags)                                                    \
20
  do {                                                                                                        \
21
    int##vType##_t newVal = atomic_sub_fetch_##vType(&(pVar), (oVal));                                        \
22
    if (newVal < 0) {                                                                                         \
23
      vWarn("vgId:%d, %s, abnormal val:%" PRIi64 ", old val:%" PRIi64, TD_VID(pVnode), tags, newVal, (oVal)); \
24
    }                                                                                                         \
25
  } while (0)
26

27
int vnodeQueryOpen(SVnode *pVnode) {
11,897✔
28
  return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
11,897✔
29
}
30

31
void vnodeQueryPreClose(SVnode *pVnode) { qWorkerStopAllTasks((void *)pVnode->pQuery); }
11,903✔
32

33
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
11,900✔
34

35
int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol) {
1,262,448✔
36
  int8_t tblType = reader->me.type;
1,262,448✔
37
  if (withExtSchema(tblType)) {
1,262,448!
38
    SColCmprWrapper *p = &(reader->me.colCmpr);
1,262,427✔
39
    if (numOfCol != p->nCols) {
1,262,427!
40
      vError("fillTableColCmpr table type:%d, col num:%d, col cmpr num:%d mismatch", tblType, numOfCol, p->nCols);
×
41
      return TSDB_CODE_APP_ERROR;
×
42
    }
43
    for (int i = 0; i < p->nCols; i++) {
31,906,691✔
44
      SColCmpr *pCmpr = &p->pColCmpr[i];
30,644,264✔
45
      pExt[i].colId = pCmpr->id;
30,644,264✔
46
      pExt[i].compress = pCmpr->alg;
30,644,264✔
47
    }
48
  }
49
  return 0;
1,262,416✔
50
}
51

52
void vnodePrintTableMeta(STableMetaRsp *pMeta) {
1,262,252✔
53
  if (!(qDebugFlag & DEBUG_DEBUG)) {
1,262,252✔
54
    return;
1,245,925✔
55
  }
56

57
  qDebug("tbName:%s", pMeta->tbName);
16,327!
58
  qDebug("stbName:%s", pMeta->stbName);
16,327!
59
  qDebug("dbFName:%s", pMeta->dbFName);
16,327!
60
  qDebug("dbId:%" PRId64, pMeta->dbId);
16,327!
61
  qDebug("numOfTags:%d", pMeta->numOfTags);
16,327!
62
  qDebug("numOfColumns:%d", pMeta->numOfColumns);
16,327!
63
  qDebug("precision:%d", pMeta->precision);
16,327!
64
  qDebug("tableType:%d", pMeta->tableType);
16,327!
65
  qDebug("sversion:%d", pMeta->sversion);
16,327!
66
  qDebug("tversion:%d", pMeta->tversion);
16,327!
67
  qDebug("suid:%" PRIu64, pMeta->suid);
16,327!
68
  qDebug("tuid:%" PRIu64, pMeta->tuid);
16,327!
69
  qDebug("vgId:%d", pMeta->vgId);
16,327!
70
  qDebug("sysInfo:%d", pMeta->sysInfo);
16,327!
71
  if (pMeta->pSchemas) {
16,327!
72
    for (int32_t i = 0; i < (pMeta->numOfColumns + pMeta->numOfTags); ++i) {
354,690✔
73
      SSchema *pSchema = pMeta->pSchemas + i;
338,343✔
74
      qDebug("%d col/tag: type:%d, flags:%d, colId:%d, bytes:%d, name:%s", i, pSchema->type, pSchema->flags,
338,343!
75
             pSchema->colId, pSchema->bytes, pSchema->name);
76
    }
77
  }
78
}
79

UNCOV
80
int32_t fillTableColRef(SMetaReader *reader, SColRef *pRef, int32_t numOfCol) {
×
UNCOV
81
  int8_t tblType = reader->me.type;
×
UNCOV
82
  if (hasRefCol(tblType)) {
×
UNCOV
83
    SColRefWrapper *p = &(reader->me.colRef);
×
UNCOV
84
    if (numOfCol != p->nCols) {
×
UNCOV
85
      vError("fillTableColRef table type:%d, col num:%d, col cmpr num:%d mismatch", tblType, numOfCol, p->nCols);
×
UNCOV
86
      return TSDB_CODE_APP_ERROR;
×
87
    }
UNCOV
88
    for (int i = 0; i < p->nCols; i++) {
×
UNCOV
89
      SColRef *pColRef = &p->pColRef[i];
×
UNCOV
90
      pRef[i].hasRef = pColRef->hasRef;
×
UNCOV
91
      pRef[i].id = pColRef->id;
×
UNCOV
92
      if(pRef[i].hasRef) {
×
UNCOV
93
        tstrncpy(pRef[i].refDbName, pColRef->refDbName, TSDB_DB_NAME_LEN);
×
UNCOV
94
        tstrncpy(pRef[i].refTableName, pColRef->refTableName, TSDB_TABLE_NAME_LEN);
×
UNCOV
95
        tstrncpy(pRef[i].refColName, pColRef->refColName, TSDB_COL_NAME_LEN);
×
96
      }
97
    }
98
  }
UNCOV
99
  return 0;
×
100
}
101

102
int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
1,296,406✔
103
  STableInfoReq  infoReq = {0};
1,296,406✔
104
  STableMetaRsp  metaRsp = {0};
1,296,406✔
105
  SMetaReader    mer1 = {0};
1,296,406✔
106
  SMetaReader    mer2 = {0};
1,296,406✔
107
  char           tableFName[TSDB_TABLE_FNAME_LEN];
108
  bool           reqTbUid = false;
1,296,406✔
109
  SRpcMsg        rpcMsg = {0};
1,296,406✔
110
  int32_t        code = 0;
1,296,406✔
111
  int32_t        rspLen = 0;
1,296,406✔
112
  void          *pRsp = NULL;
1,296,406✔
113
  SSchemaWrapper schema = {0};
1,296,406✔
114
  SSchemaWrapper schemaTag = {0};
1,296,406✔
115
  uint8_t        autoCreateCtb = 0;
1,296,406✔
116

117
  // decode req
118
  if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
1,296,406!
UNCOV
119
    code = terrno;
×
UNCOV
120
    goto _exit4;
×
121
  }
122
  autoCreateCtb = infoReq.autoCreateCtb;
1,297,036✔
123

124
  if (infoReq.option == REQ_OPT_TBUID) reqTbUid = true;
1,297,036✔
125
  metaRsp.dbId = pVnode->config.dbId;
1,297,036✔
126
  tstrncpy(metaRsp.tbName, infoReq.tbName, TSDB_TABLE_NAME_LEN);
1,297,036✔
127
  (void)memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
1,297,036✔
128

129
  if (!reqTbUid) {
1,297,036✔
130
    (void)tsnprintf(tableFName, TSDB_TABLE_FNAME_LEN, "%s.%s", infoReq.dbFName, infoReq.tbName);
1,296,244✔
131
    code = vnodeValidateTableHash(pVnode, tableFName);
1,296,371✔
132
    if (code) {
1,296,272!
UNCOV
133
      goto _exit4;
×
134
    }
135
  }
136

137
  // query meta
138
  metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
1,297,064✔
139
  if (reqTbUid) {
1,296,975✔
140
    SET_ERRNO(0);
538✔
141
    uint64_t tbUid = taosStr2UInt64(infoReq.tbName, NULL, 10);
538✔
142
    if (ERRNO == ERANGE || tbUid == 0) {
538!
UNCOV
143
      code = TSDB_CODE_TDB_TABLE_NOT_EXIST;
×
144
      goto _exit3;
137✔
145
    }
146
    SMetaReader mr3 = {0};
538✔
147
    metaReaderDoInit(&mr3, ((SVnode *)pVnode)->pMeta, META_READER_NOLOCK);
538✔
148
    if ((code = metaReaderGetTableEntryByUid(&mr3, tbUid)) < 0) {
538✔
149
      metaReaderClear(&mr3);
137✔
150
      TAOS_CHECK_GOTO(code, NULL, _exit3);
137!
151
    }
152
    tstrncpy(metaRsp.tbName, mr3.me.name, TSDB_TABLE_NAME_LEN);
401✔
153
    metaReaderClear(&mr3);
401✔
154
    TAOS_CHECK_GOTO(metaGetTableEntryByName(&mer1, metaRsp.tbName), NULL, _exit3);
401!
155
  } else if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
1,296,437✔
156
    code = terrno;
34,197✔
157
    goto _exit3;
34,285✔
158
  }
159

160
  metaRsp.tableType = mer1.me.type;
1,262,700✔
161
  metaRsp.vgId = TD_VID(pVnode);
1,262,700✔
162
  metaRsp.tuid = mer1.me.uid;
1,262,700✔
163

164
  switch (mer1.me.type) {
1,262,700!
165
    case TSDB_SUPER_TABLE: {
651,310✔
166
      (void)strcpy(metaRsp.stbName, mer1.me.name);
651,310✔
167
      schema = mer1.me.stbEntry.schemaRow;
651,310✔
168
      schemaTag = mer1.me.stbEntry.schemaTag;
651,310✔
169
      metaRsp.suid = mer1.me.uid;
651,310✔
170
      break;
651,310✔
171
    }
172
    case TSDB_CHILD_TABLE:
579,110✔
173
    case TSDB_VIRTUAL_CHILD_TABLE:{
174
      metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
579,110✔
175
      if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit2;
579,119!
176

177
      (void)strcpy(metaRsp.stbName, mer2.me.name);
579,121✔
178
      metaRsp.suid = mer2.me.uid;
579,121✔
179
      schema = mer2.me.stbEntry.schemaRow;
579,121✔
180
      schemaTag = mer2.me.stbEntry.schemaTag;
579,121✔
181
      break;
579,121✔
182
    }
183
    case TSDB_NORMAL_TABLE:
32,280✔
184
    case TSDB_VIRTUAL_NORMAL_TABLE: {
185
      schema = mer1.me.ntbEntry.schemaRow;
32,280✔
186
      break;
32,280✔
187
    }
UNCOV
188
    default: {
×
UNCOV
189
      vError("vnodeGetTableMeta get invalid table type:%d", mer1.me.type);
×
UNCOV
190
      goto _exit3;
×
191
    }
192
  }
193

194
  metaRsp.numOfTags = schemaTag.nCols;
1,262,711✔
195
  metaRsp.numOfColumns = schema.nCols;
1,262,711✔
196
  metaRsp.precision = pVnode->config.tsdbCfg.precision;
1,262,711✔
197
  metaRsp.sversion = schema.version;
1,262,711✔
198
  metaRsp.tversion = schemaTag.version;
1,262,711✔
199
  metaRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (metaRsp.numOfColumns + metaRsp.numOfTags));
1,262,711!
200
  metaRsp.pSchemaExt = (SSchemaExt *)taosMemoryCalloc(metaRsp.numOfColumns, sizeof(SSchemaExt));
1,262,612!
201
  if (NULL == metaRsp.pSchemas || NULL == metaRsp.pSchemaExt) {
1,262,405!
UNCOV
202
    code = terrno;
×
UNCOV
203
    goto _exit;
×
204
  }
205
  (void)memcpy(metaRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
1,262,430✔
206
  if (schemaTag.nCols) {
1,262,430✔
207
    (void)memcpy(metaRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
1,229,954✔
208
  }
209
  if (metaRsp.pSchemaExt) {
1,262,430✔
210
    SMetaReader *pReader = mer1.me.type == TSDB_CHILD_TABLE ? &mer2 : &mer1;
1,262,257✔
211
    code = fillTableColCmpr(pReader, metaRsp.pSchemaExt, metaRsp.numOfColumns);
1,262,257✔
212
    if (code < 0) {
1,262,494!
UNCOV
213
      goto _exit;
×
214
    }
215
    for (int32_t i = 0; i < metaRsp.numOfColumns && pReader->me.pExtSchemas; i++) {
1,453,368✔
216
      metaRsp.pSchemaExt[i].typeMod = pReader->me.pExtSchemas[i].typeMod;
190,874✔
217
    }
218
  } else {
219
    code = TSDB_CODE_OUT_OF_MEMORY;
173✔
220
    goto _exit;
173✔
221
  }
222
  if (hasRefCol(mer1.me.type)) {
1,262,494✔
223
    metaRsp.pColRefs = (SColRef*)taosMemoryMalloc(sizeof(SColRef) * metaRsp.numOfColumns);
282!
UNCOV
224
    if (metaRsp.pColRefs) {
×
UNCOV
225
      code = fillTableColRef(&mer1, metaRsp.pColRefs, metaRsp.numOfColumns);
×
UNCOV
226
      if (code < 0) {
×
UNCOV
227
        goto _exit;
×
228
      }
229
    }
UNCOV
230
    metaRsp.numOfColRefs = metaRsp.numOfColumns;
×
231
  } else {
232
    metaRsp.pColRefs = NULL;
1,262,264✔
233
    metaRsp.numOfColRefs = 0;
1,262,264✔
234
  }
235

236
  vnodePrintTableMeta(&metaRsp);
1,262,264✔
237

238
  // encode and send response
239
  rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
1,262,105✔
240
  if (rspLen < 0) {
1,261,545!
UNCOV
241
    code = terrno;
×
UNCOV
242
    goto _exit;
×
243
  }
244

245
  if (direct) {
1,261,545✔
246
    pRsp = rpcMallocCont(rspLen);
229✔
247
  } else {
248
    pRsp = taosMemoryCalloc(1, rspLen);
1,261,316!
249
  }
250

251
  if (pRsp == NULL) {
1,261,198!
UNCOV
252
    code = terrno;
×
UNCOV
253
    goto _exit;
×
254
  }
255

256
  rspLen = tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
1,261,198✔
257
  if (rspLen < 0) {
1,262,512!
UNCOV
258
    code = terrno;
×
UNCOV
259
    goto _exit;
×
260
  }
261

262
_exit:
1,262,512✔
263
  taosMemoryFree(metaRsp.pColRefs);
1,262,685!
264
  taosMemoryFree(metaRsp.pSchemas);
1,261,929!
265
  taosMemoryFree(metaRsp.pSchemaExt);
1,262,379!
266
_exit2:
1,262,444✔
267
  metaReaderClear(&mer2);
1,262,444✔
268
_exit3:
1,296,856✔
269
  metaReaderClear(&mer1);
1,296,856✔
270
_exit4:
1,297,015✔
271
  rpcMsg.info = pMsg->info;
1,297,015✔
272
  rpcMsg.pCont = pRsp;
1,297,015✔
273
  rpcMsg.contLen = rspLen;
1,297,015✔
274
  rpcMsg.code = code;
1,297,015✔
275
  rpcMsg.msgType = pMsg->msgType;
1,297,015✔
276

277
  if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST && autoCreateCtb == 1) {
1,297,015!
278
    code = TSDB_CODE_SUCCESS;
×
279
  }
280

281
  if (code) {
1,297,015✔
282
    qError("get table %s meta with %" PRIu8 " failed cause of %s", infoReq.tbName, infoReq.option, tstrerror(code));
34,421!
283
  }
284

285
  if (direct) {
1,296,464✔
286
    tmsgSendRsp(&rpcMsg);
1,888✔
287
  } else {
288
    *pMsg = rpcMsg;
1,294,576✔
289
  }
290

291
  return code;
1,295,990✔
292
}
293

294
int32_t vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
95✔
295
  STableCfgReq   cfgReq = {0};
95✔
296
  STableCfgRsp   cfgRsp = {0};
95✔
297
  SMetaReader    mer1 = {0};
95✔
298
  SMetaReader    mer2 = {0};
95✔
299
  char           tableFName[TSDB_TABLE_FNAME_LEN];
300
  SRpcMsg        rpcMsg = {0};
95✔
301
  int32_t        code = 0;
95✔
302
  int32_t        rspLen = 0;
95✔
303
  void          *pRsp = NULL;
95✔
304
  SSchemaWrapper schema = {0};
95✔
305
  SSchemaWrapper schemaTag = {0};
95✔
306

307
  // decode req
308
  if (tDeserializeSTableCfgReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
95!
UNCOV
309
    code = terrno;
×
UNCOV
310
    goto _exit;
×
311
  }
312

313
  tstrncpy(cfgRsp.tbName, cfgReq.tbName, TSDB_TABLE_NAME_LEN);
95✔
314
  (void)memcpy(cfgRsp.dbFName, cfgReq.dbFName, sizeof(cfgRsp.dbFName));
95✔
315

316
  (void)tsnprintf(tableFName, TSDB_TABLE_FNAME_LEN, "%s.%s", cfgReq.dbFName, cfgReq.tbName);
95✔
317
  code = vnodeValidateTableHash(pVnode, tableFName);
95✔
318
  if (code) {
95!
UNCOV
319
    goto _exit;
×
320
  }
321

322
  // query meta
323
  metaReaderDoInit(&mer1, pVnode->pMeta, META_READER_LOCK);
95✔
324

325
  if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) {
95!
UNCOV
326
    code = terrno;
×
UNCOV
327
    goto _exit;
×
328
  }
329

330
  cfgRsp.tableType = mer1.me.type;
95✔
331

332
  if (mer1.me.type == TSDB_SUPER_TABLE) {
95!
333
    code = TSDB_CODE_VND_HASH_MISMATCH;
×
334
    goto _exit;
×
335
  } else if (mer1.me.type == TSDB_CHILD_TABLE || mer1.me.type == TSDB_VIRTUAL_CHILD_TABLE) {
95!
336
    metaReaderDoInit(&mer2, pVnode->pMeta, META_READER_NOLOCK);
69✔
337
    if (metaReaderGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
69!
338

339
    tstrncpy(cfgRsp.stbName, mer2.me.name, TSDB_TABLE_NAME_LEN);
69✔
340
    schema = mer2.me.stbEntry.schemaRow;
69✔
341
    schemaTag = mer2.me.stbEntry.schemaTag;
69✔
342
    cfgRsp.ttl = mer1.me.ctbEntry.ttlDays;
69✔
343
    cfgRsp.commentLen = mer1.me.ctbEntry.commentLen;
69✔
344
    if (mer1.me.ctbEntry.commentLen > 0) {
69!
UNCOV
345
      cfgRsp.pComment = taosStrdup(mer1.me.ctbEntry.comment);
×
UNCOV
346
      if (NULL == cfgRsp.pComment) {
×
UNCOV
347
        code = terrno;
×
UNCOV
348
        goto _exit;
×
349
      }
350
    }
351
    STag *pTag = (STag *)mer1.me.ctbEntry.pTags;
69✔
352
    cfgRsp.tagsLen = pTag->len;
69✔
353
    cfgRsp.pTags = taosMemoryMalloc(cfgRsp.tagsLen);
69!
354
    if (NULL == cfgRsp.pTags) {
69!
UNCOV
355
      code = terrno;
×
UNCOV
356
      goto _exit;
×
357
    }
358
    (void)memcpy(cfgRsp.pTags, pTag, cfgRsp.tagsLen);
69✔
359
  } else if (mer1.me.type == TSDB_NORMAL_TABLE || mer1.me.type == TSDB_VIRTUAL_NORMAL_TABLE) {
26!
360
    schema = mer1.me.ntbEntry.schemaRow;
26✔
361
    cfgRsp.ttl = mer1.me.ntbEntry.ttlDays;
26✔
362
    cfgRsp.commentLen = mer1.me.ntbEntry.commentLen;
26✔
363
    if (mer1.me.ntbEntry.commentLen > 0) {
26!
UNCOV
364
      cfgRsp.pComment = taosStrdup(mer1.me.ntbEntry.comment);
×
UNCOV
365
      if (NULL == cfgRsp.pComment) {
×
UNCOV
366
        code = terrno;
×
UNCOV
367
        goto _exit;
×
368
      }
369
    }
370
  } else {
UNCOV
371
    vError("vnodeGetTableCfg get invalid table type:%d", mer1.me.type);
×
UNCOV
372
    code = TSDB_CODE_APP_ERROR;
×
UNCOV
373
    goto _exit;
×
374
  }
375

376
  cfgRsp.numOfTags = schemaTag.nCols;
95✔
377
  cfgRsp.numOfColumns = schema.nCols;
95✔
378
  cfgRsp.virtualStb = false; // vnode don't have super table, so it's always false
95✔
379
  cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags));
95!
380
  cfgRsp.pSchemaExt = (SSchemaExt *)taosMemoryMalloc(cfgRsp.numOfColumns * sizeof(SSchemaExt));
95!
381
  cfgRsp.pColRefs = (SColRef *)taosMemoryMalloc(sizeof(SColRef) * cfgRsp.numOfColumns);
95!
382

383
  if (NULL == cfgRsp.pSchemas || NULL == cfgRsp.pSchemaExt || NULL == cfgRsp.pColRefs) {
95!
UNCOV
384
    code = terrno;
×
UNCOV
385
    goto _exit;
×
386
  }
387
  (void)memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
95✔
388
  if (schemaTag.nCols) {
95✔
389
    (void)memcpy(cfgRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
69✔
390
  }
391

392
  SMetaReader     *pReader = (mer1.me.type == TSDB_CHILD_TABLE || mer1.me.type == TSDB_VIRTUAL_CHILD_TABLE) ? &mer2 : &mer1;
95!
393
  SColCmprWrapper *pColCmpr = &pReader->me.colCmpr;
95✔
394
  SColRefWrapper  *pColRef = &mer1.me.colRef;
95✔
395

396
  if (withExtSchema(cfgRsp.tableType)) {
95!
397
    for (int32_t i = 0; i < cfgRsp.numOfColumns; i++) {
851✔
398
      SColCmpr   *pCmpr = &pColCmpr->pColCmpr[i];
756✔
399
      SSchemaExt *pSchExt = cfgRsp.pSchemaExt + i;
756✔
400
      pSchExt->colId = pCmpr->id;
756✔
401
      pSchExt->compress = pCmpr->alg;
756✔
402
      if (pReader->me.pExtSchemas)
756✔
403
        pSchExt->typeMod = pReader->me.pExtSchemas[i].typeMod;
99✔
404
      else
405
        pSchExt->typeMod = 0;
657✔
406
    }
407
  }
408

409
  cfgRsp.virtualStb = false;
95✔
410
  if (hasRefCol(cfgRsp.tableType)) {
95!
UNCOV
411
    for (int32_t i = 0; i < cfgRsp.numOfColumns; i++) {
×
UNCOV
412
      SColRef *pRef = &pColRef->pColRef[i];
×
UNCOV
413
      cfgRsp.pColRefs[i].hasRef = pRef->hasRef;
×
UNCOV
414
      cfgRsp.pColRefs[i].id = pRef->id;
×
UNCOV
415
      if (cfgRsp.pColRefs[i].hasRef) {
×
UNCOV
416
        tstrncpy(cfgRsp.pColRefs[i].refDbName, pRef->refDbName, TSDB_DB_NAME_LEN);
×
UNCOV
417
        tstrncpy(cfgRsp.pColRefs[i].refTableName, pRef->refTableName, TSDB_TABLE_NAME_LEN);
×
UNCOV
418
        tstrncpy(cfgRsp.pColRefs[i].refColName, pRef->refColName, TSDB_COL_NAME_LEN);
×
419
      }
420
    }
421
  }
422

423
  // encode and send response
424
  rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp);
95✔
425
  if (rspLen < 0) {
95!
UNCOV
426
    code = terrno;
×
UNCOV
427
    goto _exit;
×
428
  }
429

430
  if (direct) {
95!
431
    pRsp = rpcMallocCont(rspLen);
×
432
  } else {
433
    pRsp = taosMemoryCalloc(1, rspLen);
95!
434
  }
435

436
  if (pRsp == NULL) {
95!
437
    code = terrno;
×
438
    goto _exit;
×
439
  }
440

441
  rspLen = tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp);
95✔
442
  if (rspLen < 0) {
95!
UNCOV
443
    code = terrno;
×
UNCOV
444
    goto _exit;
×
445
  }
446

447
_exit:
95✔
448
  rpcMsg.info = pMsg->info;
95✔
449
  rpcMsg.pCont = pRsp;
95✔
450
  rpcMsg.contLen = rspLen;
95✔
451
  rpcMsg.code = code;
95✔
452
  rpcMsg.msgType = pMsg->msgType;
95✔
453

454
  if (code) {
95!
UNCOV
455
    qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code));
×
456
  }
457

458
  if (direct) {
95!
UNCOV
459
    tmsgSendRsp(&rpcMsg);
×
460
  } else {
461
    *pMsg = rpcMsg;
95✔
462
  }
463

464
  tFreeSTableCfgRsp(&cfgRsp);
95✔
465
  metaReaderClear(&mer2);
95✔
466
  metaReaderClear(&mer1);
95✔
467
  return code;
95✔
468
}
469

470
static FORCE_INLINE void vnodeFreeSBatchRspMsg(void *p) {
471
  if (NULL == p) {
472
    return;
473
  }
474

475
  SBatchRspMsg *pRsp = (SBatchRspMsg *)p;
476
  rpcFreeCont(pRsp->msg);
477
}
478

479
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
1,108,310✔
480
  int32_t      code = 0;
1,108,310✔
481
  int32_t      rspSize = 0;
1,108,310✔
482
  SBatchReq    batchReq = {0};
1,108,310✔
483
  SBatchMsg   *req = NULL;
1,108,310✔
484
  SBatchRspMsg rsp = {0};
1,108,310✔
485
  SBatchRsp    batchRsp = {0};
1,108,310✔
486
  SRpcMsg      reqMsg = *pMsg;
1,108,310✔
487
  SRpcMsg      rspMsg = {0};
1,108,310✔
488
  void        *pRsp = NULL;
1,108,310✔
489

490
  if (tDeserializeSBatchReq(pMsg->pCont, pMsg->contLen, &batchReq)) {
1,108,310!
UNCOV
491
    code = terrno;
×
492
    qError("tDeserializeSBatchReq failed");
×
493
    goto _exit;
×
494
  }
495

496
  int32_t msgNum = taosArrayGetSize(batchReq.pMsgs);
1,108,558✔
497
  if (msgNum >= MAX_META_MSG_IN_BATCH) {
1,108,340!
UNCOV
498
    code = TSDB_CODE_INVALID_MSG;
×
UNCOV
499
    qError("too many msgs %d in vnode batch meta req", msgNum);
×
500
    goto _exit;
×
501
  }
502

503
  batchRsp.pRsps = taosArrayInit(msgNum, sizeof(SBatchRspMsg));
1,108,340✔
504
  if (NULL == batchRsp.pRsps) {
1,108,423!
UNCOV
505
    code = terrno;
×
506
    qError("taosArrayInit %d SBatchRspMsg failed", msgNum);
×
507
    goto _exit;
×
508
  }
509

510
  for (int32_t i = 0; i < msgNum; ++i) {
2,408,270✔
511
    req = taosArrayGet(batchReq.pMsgs, i);
1,301,082✔
512
    if (req == NULL) {
1,300,880✔
513
      code = terrno;
28✔
UNCOV
514
      goto _exit;
×
515
    }
516

517
    reqMsg.msgType = req->msgType;
1,300,852✔
518
    reqMsg.pCont = req->msg;
1,300,852✔
519
    reqMsg.contLen = req->msgLen;
1,300,852✔
520

521
    switch (req->msgType) {
1,300,852!
522
      case TDMT_VND_TABLE_META:
1,294,186✔
523
        // error code has been set into reqMsg, no need to handle it here.
524
        if (TSDB_CODE_SUCCESS != vnodeGetTableMeta(pVnode, &reqMsg, false)) {
1,294,186✔
525
          qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
32,626!
526
        }
527
        break;
1,293,892✔
528
      case TDMT_VND_TABLE_NAME:
538✔
529
        // error code has been set into reqMsg, no need to handle it here.
530
        if (TSDB_CODE_SUCCESS != vnodeGetTableMeta(pVnode, &reqMsg, false)) {
538✔
531
          qWarn("vnodeGetBatchName failed, msgType:%d", req->msgType);
137!
532
        }
533
        break;
538✔
534
      case TDMT_VND_TABLE_CFG:
95✔
535
        // error code has been set into reqMsg, no need to handle it here.
536
        if (TSDB_CODE_SUCCESS != vnodeGetTableCfg(pVnode, &reqMsg, false)) {
95!
UNCOV
537
          qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
×
538
        }
539
        break;
95✔
540
      case TDMT_VND_GET_STREAM_PROGRESS:
6,033✔
541
        // error code has been set into reqMsg, no need to handle it here.
542
        if (TSDB_CODE_SUCCESS != vnodeGetStreamProgress(pVnode, &reqMsg, false)) {
6,033!
UNCOV
543
          qWarn("vnodeGetBatchMeta failed, msgType:%d", req->msgType);
×
544
        }
545
        break;
6,034✔
UNCOV
546
      default:
×
UNCOV
547
        qError("invalid req msgType %d", req->msgType);
×
UNCOV
548
        reqMsg.code = TSDB_CODE_INVALID_MSG;
×
UNCOV
549
        reqMsg.pCont = NULL;
×
UNCOV
550
        reqMsg.contLen = 0;
×
UNCOV
551
        break;
×
552
    }
553

554
    rsp.msgIdx = req->msgIdx;
1,300,559✔
555
    rsp.reqType = reqMsg.msgType;
1,300,559✔
556
    rsp.msgLen = reqMsg.contLen;
1,300,559✔
557
    rsp.rspCode = reqMsg.code;
1,300,559✔
558
    rsp.msg = reqMsg.pCont;
1,300,559✔
559

560
    if (NULL == taosArrayPush(batchRsp.pRsps, &rsp)) {
2,600,396!
UNCOV
561
      qError("taosArrayPush failed");
×
UNCOV
562
      code = terrno;
×
UNCOV
563
      goto _exit;
×
564
    }
565
  }
566

567
  rspSize = tSerializeSBatchRsp(NULL, 0, &batchRsp);
1,107,188✔
568
  if (rspSize < 0) {
1,107,807!
UNCOV
569
    qError("tSerializeSBatchRsp failed");
×
UNCOV
570
    code = terrno;
×
571
    goto _exit;
×
572
  }
573
  pRsp = rpcMallocCont(rspSize);
1,107,807✔
574
  if (pRsp == NULL) {
1,107,950!
575
    qError("rpcMallocCont %d failed", rspSize);
×
576
    code = terrno;
×
UNCOV
577
    goto _exit;
×
578
  }
579
  if (tSerializeSBatchRsp(pRsp, rspSize, &batchRsp) < 0) {
1,107,950!
UNCOV
580
    qError("tSerializeSBatchRsp %d failed", rspSize);
×
UNCOV
581
    code = terrno;
×
UNCOV
582
    goto _exit;
×
583
  }
584

585
_exit:
1,108,555✔
586

587
  rspMsg.info = pMsg->info;
1,108,555✔
588
  rspMsg.pCont = pRsp;
1,108,555✔
589
  rspMsg.contLen = rspSize;
1,108,555✔
590
  rspMsg.code = code;
1,108,555✔
591
  rspMsg.msgType = pMsg->msgType;
1,108,555✔
592

593
  if (code) {
1,108,555!
UNCOV
594
    qError("vnd get batch meta failed cause of %s", tstrerror(code));
×
595
  }
596

597
  taosArrayDestroyEx(batchReq.pMsgs, tFreeSBatchReqMsg);
1,108,555✔
598
  taosArrayDestroyEx(batchRsp.pRsps, tFreeSBatchRspMsg);
1,108,125✔
599

600
  tmsgSendRsp(&rspMsg);
1,108,378✔
601

602
  return code;
1,108,229✔
603
}
604

605
#define VNODE_DO_META_QUERY(pVnode, cmd)                 \
606
  do {                                                   \
607
    (void)taosThreadRwlockRdlock(&(pVnode)->metaRWLock); \
608
    cmd;                                                 \
609
    (void)taosThreadRwlockUnlock(&(pVnode)->metaRWLock); \
610
  } while (0)
611

NEW
612
int32_t vnodeReadVSubtables(SReadHandle* pHandle, int64_t suid, SArray** ppRes) {
×
NEW
613
  int32_t                    code = TSDB_CODE_SUCCESS;
×
NEW
614
  int32_t                    line = 0;
×
NEW
615
  SMetaReader                mr = {0};
×
NEW
616
  bool                       readerInit = false;
×
NEW
617
  SVCTableRefCols*           pTb = NULL;
×
NEW
618
  int32_t                    refColsNum = 0;
×
619
  char                       tbFName[TSDB_TABLE_FNAME_LEN];
620
  
NEW
621
  SArray *pList = taosArrayInit(10, sizeof(uint64_t));
×
NEW
622
  QUERY_CHECK_NULL(pList, code, line, _return, terrno);
×
623
  
NEW
624
  QUERY_CHECK_CODE(pHandle->api.metaFn.getChildTableList(pHandle->vnode, suid, pList), line, _return);
×
625

NEW
626
  size_t num = taosArrayGetSize(pList);
×
NEW
627
  *ppRes = taosArrayInit(num, POINTER_BYTES);
×
NEW
628
  QUERY_CHECK_NULL(*ppRes, code, line, _return, terrno);
×
NEW
629
  SSHashObj* pSrcTbls = tSimpleHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
×
NEW
630
  QUERY_CHECK_NULL(pSrcTbls, code, line, _return, terrno);
×
631

NEW
632
  for (int32_t i = 0; i < num; ++i) {
×
NEW
633
    uint64_t* id = taosArrayGet(pList, i);
×
NEW
634
    QUERY_CHECK_NULL(id, code, line, _return, terrno);
×
NEW
635
    pHandle->api.metaReaderFn.initReader(&mr, pHandle->vnode, META_READER_LOCK, &pHandle->api.metaFn);
×
NEW
636
    QUERY_CHECK_CODE(pHandle->api.metaReaderFn.getTableEntryByUid(&mr, *id), line, _return);
×
NEW
637
    readerInit = true;
×
638

NEW
639
    refColsNum = 0;
×
NEW
640
    for (int32_t j = 0; j < mr.me.colRef.nCols; j++) {
×
NEW
641
      if (mr.me.colRef.pColRef[j].hasRef) {
×
NEW
642
        refColsNum++;
×
643
      }
644
    }
645

NEW
646
    if (refColsNum <= 0) {
×
NEW
647
      pHandle->api.metaReaderFn.clearReader(&mr);
×
NEW
648
      readerInit = false;
×
NEW
649
      continue;
×
650
    }
651

NEW
652
    pTb = taosMemoryCalloc(1, refColsNum * sizeof(SRefColInfo) + sizeof(*pTb));
×
NEW
653
    QUERY_CHECK_NULL(pTb, code, line, _return, terrno);
×
654

NEW
655
    pTb->uid = mr.me.uid;
×
NEW
656
    pTb->numOfColRefs = refColsNum;
×
NEW
657
    pTb->refCols = (SRefColInfo*)(pTb + 1);
×
658
    
NEW
659
    refColsNum = 0;
×
NEW
660
    tSimpleHashClear(pSrcTbls);
×
NEW
661
    for (int32_t j = 0; j < mr.me.colRef.nCols; j++) {
×
NEW
662
      if (!mr.me.colRef.pColRef[j].hasRef) {
×
NEW
663
        continue;
×
664
      }
665

NEW
666
      pTb->refCols[refColsNum].colId = mr.me.colRef.pColRef[j].id;
×
NEW
667
      tstrncpy(pTb->refCols[refColsNum].refColName, mr.me.colRef.pColRef[j].refColName, TSDB_COL_NAME_LEN);
×
NEW
668
      tstrncpy(pTb->refCols[refColsNum].refTableName, mr.me.colRef.pColRef[j].refTableName, TSDB_TABLE_NAME_LEN);
×
NEW
669
      tstrncpy(pTb->refCols[refColsNum].refDbName, mr.me.colRef.pColRef[j].refDbName, TSDB_DB_NAME_LEN);
×
670

NEW
671
      snprintf(tbFName, sizeof(tbFName), "%s.%s", pTb->refCols[refColsNum].refDbName, pTb->refCols[refColsNum].refTableName);
×
672

NEW
673
      if (NULL == tSimpleHashGet(pSrcTbls, tbFName, strlen(tbFName))) {
×
NEW
674
        QUERY_CHECK_CODE(tSimpleHashPut(pSrcTbls, tbFName, strlen(tbFName), &code, sizeof(code)), line, _return);
×
675
      }
676
      
NEW
677
      refColsNum++;
×
678
    }
679

NEW
680
    pTb->numOfSrcTbls = tSimpleHashGetSize(pSrcTbls);
×
NEW
681
    QUERY_CHECK_NULL(taosArrayPush(*ppRes, &pTb), code, line, _return, terrno);
×
NEW
682
    pTb = NULL;
×
683
    
NEW
684
    pHandle->api.metaReaderFn.clearReader(&mr);
×
NEW
685
    readerInit = false;
×
686
  }
687

NEW
688
_return:
×
689

NEW
690
  if (readerInit) {
×
NEW
691
    pHandle->api.metaReaderFn.clearReader(&mr);
×
692
  }
693

NEW
694
  taosArrayDestroy(pList);
×
NEW
695
  taosMemoryFree(pTb);
×
NEW
696
  tSimpleHashCleanup(pSrcTbls);
×
697
  
NEW
698
  if (code) {
×
NEW
699
    qError("%s failed since %s", __func__, tstrerror(code));
×
700
  }
NEW
701
  return code;
×
702
}
703

704

NEW
705
int32_t vnodeGetVSubtablesMeta(SVnode *pVnode, SRpcMsg *pMsg) {
×
NEW
706
  int32_t        code = 0;
×
NEW
707
  int32_t        rspSize = 0;
×
NEW
708
  SVSubTablesReq req = {0};
×
NEW
709
  SVSubTablesRsp rsp = {0};
×
NEW
710
  SRpcMsg      rspMsg = {0};
×
NEW
711
  void        *pRsp = NULL;
×
NEW
712
  int32_t      line = 0;
×
713

NEW
714
  if (tDeserializeSVSubTablesReq(pMsg->pCont, pMsg->contLen, &req)) {
×
NEW
715
    code = terrno;
×
NEW
716
    qError("tDeserializeSVSubTablesReq failed");
×
NEW
717
    goto _return;
×
718
  }
719

NEW
720
  SReadHandle handle = {.vnode = pVnode};
×
NEW
721
  initStorageAPI(&handle.api);
×
722

NEW
723
  QUERY_CHECK_CODE(vnodeReadVSubtables(&handle, req.suid, &rsp.pTables), line, _return);
×
NEW
724
  rsp.vgId = TD_VID(pVnode);
×
725

NEW
726
  rspSize = tSerializeSVSubTablesRsp(NULL, 0, &rsp);
×
NEW
727
  if (rspSize < 0) {
×
NEW
728
    code = rspSize;
×
NEW
729
    qError("tSerializeSVSubTablesRsp failed, error:%d", rspSize);
×
NEW
730
    goto _return;
×
731
  }
NEW
732
  pRsp = rpcMallocCont(rspSize);
×
NEW
733
  if (pRsp == NULL) {
×
NEW
734
    code = terrno;
×
NEW
735
    qError("rpcMallocCont %d failed, error:%d", rspSize, terrno);
×
NEW
736
    goto _return;
×
737
  }
NEW
738
  rspSize = tSerializeSVSubTablesRsp(pRsp, rspSize, &rsp);
×
NEW
739
  if (rspSize < 0) {
×
NEW
740
    code = rspSize;
×
NEW
741
    qError("tSerializeSVSubTablesRsp failed, error:%d", rspSize);
×
NEW
742
    goto _return;
×
743
  }
744

NEW
745
_return:
×
746

NEW
747
  rspMsg.info = pMsg->info;
×
NEW
748
  rspMsg.pCont = pRsp;
×
NEW
749
  rspMsg.contLen = rspSize;
×
NEW
750
  rspMsg.code = code;
×
NEW
751
  rspMsg.msgType = pMsg->msgType;
×
752

NEW
753
  if (code) {
×
NEW
754
    qError("vnd get virtual subtables failed cause of %s", tstrerror(code));
×
755
  }
756

NEW
757
  tDestroySVSubTablesRsp(&rsp);
×
758

NEW
759
  tmsgSendRsp(&rspMsg);
×
760

NEW
761
  return code;
×
762
}
763

764

765
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
1,166,108✔
766
  SSyncState state = syncGetState(pVnode->sync);
1,166,108✔
767
  pLoad->syncAppliedIndex = pVnode->state.applied;
1,166,108✔
768
  syncGetCommitIndex(pVnode->sync, &pLoad->syncCommitIndex);
1,166,108✔
769

770
  pLoad->vgId = TD_VID(pVnode);
1,166,108✔
771
  pLoad->syncState = state.state;
1,166,108✔
772
  pLoad->syncRestore = state.restored;
1,166,108✔
773
  pLoad->syncTerm = state.term;
1,166,108✔
774
  pLoad->roleTimeMs = state.roleTimeMs;
1,166,108✔
775
  pLoad->startTimeMs = state.startTimeMs;
1,166,108✔
776
  pLoad->syncCanRead = state.canRead;
1,166,108✔
777
  pLoad->learnerProgress = state.progress;
1,166,108✔
778
  pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
1,166,108✔
779
  pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
1,166,108✔
780
  VNODE_DO_META_QUERY(pVnode, pLoad->numOfTables = metaGetTbNum(pVnode->pMeta));
1,166,108✔
781
  VNODE_DO_META_QUERY(pVnode, pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1));
1,166,108✔
782
  pLoad->totalStorage = (int64_t)3 * 1073741824;
1,166,108✔
783
  pLoad->compStorage = (int64_t)2 * 1073741824;
1,166,108✔
784
  pLoad->pointsWritten = 100;
1,166,108✔
785
  pLoad->numOfSelectReqs = 1;
1,166,108✔
786
  pLoad->numOfInsertReqs = atomic_load_64(&pVnode->statis.nInsert);
1,166,108✔
787
  pLoad->numOfInsertSuccessReqs = atomic_load_64(&pVnode->statis.nInsertSuccess);
1,166,108✔
788
  pLoad->numOfBatchInsertReqs = atomic_load_64(&pVnode->statis.nBatchInsert);
1,166,108✔
789
  pLoad->numOfBatchInsertSuccessReqs = atomic_load_64(&pVnode->statis.nBatchInsertSuccess);
1,166,108✔
790
  return 0;
1,166,108✔
791
}
792

793
int32_t vnodeGetLoadLite(SVnode *pVnode, SVnodeLoadLite *pLoad) {
×
794
  SSyncState syncState = syncGetState(pVnode->sync);
×
UNCOV
795
  if (syncState.state == TAOS_SYNC_STATE_LEADER || syncState.state == TAOS_SYNC_STATE_ASSIGNED_LEADER) {
×
UNCOV
796
    pLoad->vgId = TD_VID(pVnode);
×
797
    pLoad->nTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta, 1);
×
798
    return 0;
×
799
  }
UNCOV
800
  return -1;
×
801
}
802
/**
803
 * @brief Reset the statistics value by monitor interval
804
 *
805
 * @param pVnode
806
 * @param pLoad
807
 */
808
void vnodeResetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
32✔
809
  VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsert, pLoad->numOfInsertReqs, 64, "nInsert");
32!
810
  VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nInsertSuccess, pLoad->numOfInsertSuccessReqs, 64, "nInsertSuccess");
32!
811
  VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsert, pLoad->numOfBatchInsertReqs, 64, "nBatchInsert");
32!
812
  VNODE_GET_LOAD_RESET_VALS(pVnode->statis.nBatchInsertSuccess, pLoad->numOfBatchInsertSuccessReqs, 64,
32!
813
                            "nBatchInsertSuccess");
814
}
32✔
815

816
void vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId, int64_t *numOfTables, int64_t *numOfNormalTables) {
967,098✔
817
  SVnode    *pVnodeObj = pVnode;
967,098✔
818
  SVnodeCfg *pConf = &pVnodeObj->config;
967,098✔
819

820
  if (dbname) {
967,098!
821
    *dbname = pConf->dbname;
967,724✔
822
  }
823

824
  if (vgId) {
967,098✔
825
    *vgId = TD_VID(pVnodeObj);
966,542✔
826
  }
827

828
  if (numOfTables) {
967,098!
UNCOV
829
    *numOfTables = pConf->vndStats.numOfNTables + pConf->vndStats.numOfCTables;
×
830
  }
831

832
  if (numOfNormalTables) {
967,098!
UNCOV
833
    *numOfNormalTables = pConf->vndStats.numOfNTables;
×
834
  }
835
}
967,098✔
836

837
int32_t vnodeGetTableList(void *pVnode, int8_t type, SArray *pList) {
×
UNCOV
838
  if (type == TSDB_SUPER_TABLE) {
×
UNCOV
839
    return vnodeGetStbIdList(pVnode, 0, pList);
×
840
  } else {
UNCOV
841
    return TSDB_CODE_INVALID_PARA;
×
842
  }
843
}
844

UNCOV
845
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
×
UNCOV
846
  int32_t      code = TSDB_CODE_SUCCESS;
×
847
  SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, uid, 1);
×
848
  if (NULL == pCur) {
×
849
    qError("vnode get all table list failed");
×
UNCOV
850
    return terrno;
×
851
  }
852

UNCOV
853
  while (1) {
×
UNCOV
854
    tb_uid_t id = metaCtbCursorNext(pCur);
×
UNCOV
855
    if (id == 0) {
×
UNCOV
856
      break;
×
857
    }
858

UNCOV
859
    STableKeyInfo info = {uid = id};
×
860
    if (NULL == taosArrayPush(list, &info)) {
×
861
      qError("taosArrayPush failed");
×
862
      code = terrno;
×
863
      goto _exit;
×
864
    }
865
  }
866
_exit:
×
867
  metaCloseCtbCursor(pCur);
×
868
  return code;
×
869
}
870

UNCOV
871
int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg) {
×
872
  return 0;
×
873
}
874

875
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
2,694,738✔
876
  int32_t      code = TSDB_CODE_SUCCESS;
2,694,738✔
877
  SVnode      *pVnodeObj = pVnode;
2,694,738✔
878
  SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj, suid, 1);
2,694,738✔
879
  if (NULL == pCur) {
2,696,123!
UNCOV
880
    qError("vnode get all table list failed");
×
UNCOV
881
    return terrno;
×
882
  }
883

884
  while (1) {
8,818,543✔
885
    tb_uid_t id = metaCtbCursorNext(pCur);
11,514,666✔
886
    if (id == 0) {
11,511,471✔
887
      break;
2,700,784✔
888
    }
889

890
    if (NULL == taosArrayPush(list, &id)) {
8,818,543!
891
      qError("taosArrayPush failed");
×
UNCOV
892
      code = terrno;
×
UNCOV
893
      goto _exit;
×
894
    }
895
  }
896

897
_exit:
2,700,784✔
898
  metaCloseCtbCursor(pCur);
2,700,784✔
899
  return code;
2,698,010✔
900
}
901

902
int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list) {
165,348✔
903
  int32_t      code = TSDB_CODE_SUCCESS;
165,348✔
904
  SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid);
165,348✔
905
  if (!pCur) {
165,351!
UNCOV
906
    return TSDB_CODE_OUT_OF_MEMORY;
×
907
  }
908

909
  while (1) {
153,675✔
910
    tb_uid_t id = metaStbCursorNext(pCur);
319,026✔
911
    if (id == 0) {
319,029✔
912
      break;
165,349✔
913
    }
914

915
    if (NULL == taosArrayPush(list, &id)) {
153,675!
UNCOV
916
      qError("taosArrayPush failed");
×
UNCOV
917
      code = terrno;
×
UNCOV
918
      goto _exit;
×
919
    }
920
  }
921

922
_exit:
165,349✔
923
  metaCloseStbCursor(pCur);
165,349✔
924
  return code;
165,352✔
925
}
926

UNCOV
927
int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg, void *arg1),
×
928
                                  void *arg) {
UNCOV
929
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
930
  SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, suid);
×
UNCOV
931
  if (!pCur) {
×
UNCOV
932
    return terrno;
×
933
  }
934

UNCOV
935
  while (1) {
×
UNCOV
936
    tb_uid_t id = metaStbCursorNext(pCur);
×
UNCOV
937
    if (id == 0) {
×
UNCOV
938
      break;
×
939
    }
940

UNCOV
941
    if ((*filter) && (*filter)(arg, &id)) {
×
UNCOV
942
      continue;
×
943
    }
944

UNCOV
945
    if (NULL == taosArrayPush(list, &id)) {
×
UNCOV
946
      qError("taosArrayPush failed");
×
UNCOV
947
      code = terrno;
×
UNCOV
948
      goto _exit;
×
949
    }
950
  }
951

UNCOV
952
_exit:
×
UNCOV
953
  metaCloseStbCursor(pCur);
×
UNCOV
954
  return code;
×
955
}
956

957
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
22,838✔
958
  SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 0);
22,838✔
959
  if (!pCur) {
22,837!
UNCOV
960
    return terrno;
×
961
  }
962

963
  *num = 0;
22,839✔
964
  while (1) {
21,095✔
965
    tb_uid_t id = metaCtbCursorNext(pCur);
43,934✔
966
    if (id == 0) {
43,940✔
967
      break;
22,845✔
968
    }
969

970
    ++(*num);
21,095✔
971
  }
972

973
  metaCloseCtbCursor(pCur);
22,845✔
974
  return TSDB_CODE_SUCCESS;
22,844✔
975
}
976

977
int32_t vnodeGetStbColumnNum(SVnode *pVnode, tb_uid_t suid, int *num) {
22,840✔
978
  SSchemaWrapper *pSW = metaGetTableSchema(pVnode->pMeta, suid, -1, 0, NULL);
22,840✔
979
  if (pSW) {
22,845!
980
    *num = pSW->nCols;
22,845!
981
    tDeleteSchemaWrapper(pSW);
982
  } else {
983
    *num = 2;
×
984
  }
985

986
  return TSDB_CODE_SUCCESS;
22,844✔
987
}
988

989
int32_t vnodeGetStbKeep(SVnode *pVnode, tb_uid_t suid, int64_t *keep) {
22,841✔
990
  SMetaReader mr = {0};
22,841✔
991
  metaReaderDoInit(&mr, pVnode->pMeta, META_READER_NOLOCK);
22,841✔
992

993
  int32_t code = metaReaderGetTableEntryByUid(&mr, suid);
22,843✔
994
  if (code == TSDB_CODE_SUCCESS) {
22,842!
995
    *keep = mr.me.stbEntry.keep;
22,842✔
996
  } else {
UNCOV
997
    *keep = 0;  // Default value if not found
×
998
  }
999

1000
  metaReaderClear(&mr);
22,842✔
1001
  return TSDB_CODE_SUCCESS;
22,844✔
1002
}
1003

1004
#ifdef TD_ENTERPRISE
1005
const char *tkLogStb[] = {"cluster_info",
1006
                          "data_dir",
1007
                          "dnodes_info",
1008
                          "d_info",
1009
                          "grants_info",
1010
                          "keeper_monitor",
1011
                          "logs",
1012
                          "log_dir",
1013
                          "log_summary",
1014
                          "m_info",
1015
                          "taosadapter_restful_http_request_fail",
1016
                          "taosadapter_restful_http_request_in_flight",
1017
                          "taosadapter_restful_http_request_summary_milliseconds",
1018
                          "taosadapter_restful_http_request_total",
1019
                          "taosadapter_system_cpu_percent",
1020
                          "taosadapter_system_mem_percent",
1021
                          "temp_dir",
1022
                          "vgroups_info",
1023
                          "vnodes_role"};
1024
const char *tkAuditStb[] = {"operations"};
1025
const int   tkLogStbNum = ARRAY_SIZE(tkLogStb);
1026
const int   tkAuditStbNum = ARRAY_SIZE(tkAuditStb);
1027

1028
// exclude stbs of taoskeeper log
1029
static int32_t vnodeGetTimeSeriesBlackList(SVnode *pVnode, int32_t *tbSize) {
165,333✔
1030
  int32_t      code = TSDB_CODE_SUCCESS;
165,333✔
1031
  int32_t      tbNum = 0;
165,333✔
1032
  const char **pTbArr = NULL;
165,333✔
1033
  const char  *dbName = NULL;
165,333✔
1034
  *tbSize = 0;
165,333✔
1035

1036
  if (!(dbName = strchr(pVnode->config.dbname, '.'))) return 0;
165,333!
1037
  if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
165,333!
UNCOV
1038
    tbNum = tkLogStbNum;
×
UNCOV
1039
    pTbArr = (const char **)&tkLogStb;
×
1040
  } else if (0 == strncmp(dbName, "audit", TSDB_DB_NAME_LEN)) {
165,333!
UNCOV
1041
    tbNum = tkAuditStbNum;
×
UNCOV
1042
    pTbArr = (const char **)&tkAuditStb;
×
1043
  }
1044
  if (tbNum && pTbArr) {
165,333!
UNCOV
1045
    *tbSize = metaSizeOfTbFilterCache(pVnode->pMeta, 0);
×
1046
    if (*tbSize < tbNum) {
×
1047
      for (int32_t i = 0; i < tbNum; ++i) {
×
1048
        tb_uid_t suid = metaGetTableEntryUidByName(pVnode->pMeta, pTbArr[i]);
×
1049
        if (suid != 0) {
×
UNCOV
1050
          code = metaPutTbToFilterCache(pVnode->pMeta, &suid, 0);
×
UNCOV
1051
          if (TSDB_CODE_SUCCESS != code) {
×
1052
            return code;
×
1053
          }
1054
        }
1055
      }
1056
      *tbSize = metaSizeOfTbFilterCache(pVnode->pMeta, 0);
×
1057
    }
1058
  }
1059

1060
  return code;
165,333✔
1061
}
1062
#endif
1063

UNCOV
1064
static bool vnodeTimeSeriesFilter(void *arg1, void *arg2) {
×
UNCOV
1065
  SVnode *pVnode = (SVnode *)arg1;
×
1066

UNCOV
1067
  if (metaTbInFilterCache(pVnode->pMeta, arg2, 0)) {
×
UNCOV
1068
    return true;
×
1069
  }
1070
  return false;
×
1071
}
1072

1073
int32_t vnodeGetTimeSeriesNum(SVnode *pVnode, int64_t *num) {
165,332✔
1074
  SArray *suidList = NULL;
165,332✔
1075

1076
  if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) {
165,332!
UNCOV
1077
    return terrno;
×
1078
  }
1079

1080
  int32_t tbFilterSize = 0;
165,333✔
1081
  int32_t code = TSDB_CODE_SUCCESS;
165,333✔
1082
#ifdef TD_ENTERPRISE
1083
  code = vnodeGetTimeSeriesBlackList(pVnode, &tbFilterSize);
165,333✔
1084
  if (TSDB_CODE_SUCCESS != code) {
165,333!
UNCOV
1085
    goto _exit;
×
1086
  }
1087
#endif
1088

1089
  if ((!tbFilterSize && vnodeGetStbIdList(pVnode, 0, suidList) < 0) ||
165,333!
1090
      (tbFilterSize && vnodeGetStbIdListByFilter(pVnode, 0, suidList, vnodeTimeSeriesFilter, pVnode) < 0)) {
165,334!
UNCOV
1091
    qError("vgId:%d, failed to get stb id list error: %s", TD_VID(pVnode), terrstr());
×
UNCOV
1092
    taosArrayDestroy(suidList);
×
UNCOV
1093
    return terrno;
×
1094
  }
1095

1096
  *num = 0;
165,336✔
1097
  int64_t arrSize = taosArrayGetSize(suidList);
165,336✔
1098
  for (int64_t i = 0; i < arrSize; ++i) {
319,027✔
1099
    tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i);
153,694✔
1100

1101
    int64_t ctbNum = 0;
153,690✔
1102
    int32_t numOfCols = 0;
153,690✔
1103
    code = metaGetStbStats(pVnode, suid, &ctbNum, &numOfCols);
153,690✔
1104
    if (TSDB_CODE_SUCCESS != code) {
153,695!
UNCOV
1105
      goto _exit;
×
1106
    }
1107
    *num += ctbNum * (numOfCols - 1);
153,695✔
1108
  }
1109

1110
_exit:
165,333✔
1111
  taosArrayDestroy(suidList);
165,333✔
1112
  return TSDB_CODE_SUCCESS;
165,337✔
1113
}
1114

UNCOV
1115
int32_t vnodeGetAllCtbNum(SVnode *pVnode, int64_t *num) {
×
UNCOV
1116
  SMStbCursor *pCur = metaOpenStbCursor(pVnode->pMeta, 0);
×
UNCOV
1117
  if (!pCur) {
×
UNCOV
1118
    return terrno;
×
1119
  }
1120

UNCOV
1121
  *num = 0;
×
UNCOV
1122
  while (1) {
×
UNCOV
1123
    tb_uid_t id = metaStbCursorNext(pCur);
×
UNCOV
1124
    if (id == 0) {
×
UNCOV
1125
      break;
×
1126
    }
1127

UNCOV
1128
    int64_t ctbNum = 0;
×
UNCOV
1129
    int32_t code = vnodeGetCtbNum(pVnode, id, &ctbNum);
×
UNCOV
1130
    if (TSDB_CODE_SUCCESS != code) {
×
UNCOV
1131
      metaCloseStbCursor(pCur);
×
UNCOV
1132
      return code;
×
1133
    }
1134

UNCOV
1135
    *num += ctbNum;
×
1136
  }
1137

UNCOV
1138
  metaCloseStbCursor(pCur);
×
UNCOV
1139
  return TSDB_CODE_SUCCESS;
×
1140
}
1141

1142
void *vnodeGetIdx(void *pVnode) {
52,982✔
1143
  if (pVnode == NULL) {
52,982!
1144
    return NULL;
×
1145
  }
1146

1147
  return metaGetIdx(((SVnode *)pVnode)->pMeta);
52,982✔
1148
}
1149

1150
void *vnodeGetIvtIdx(void *pVnode) {
52,959✔
1151
  if (pVnode == NULL) {
52,959!
1152
    return NULL;
×
1153
  }
1154
  return metaGetIvtIdx(((SVnode *)pVnode)->pMeta);
52,959✔
1155
}
1156

1157
int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) {
254✔
1158
  return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid);
254✔
1159
}
1160

1161
int32_t vnodeGetDBSize(void *pVnode, SDbSizeStatisInfo *pInfo) {
22✔
1162
  SVnode *pVnodeObj = pVnode;
22✔
1163
  if (pVnodeObj == NULL) {
22!
1164
    return TSDB_CODE_VND_NOT_EXIST;
×
1165
  }
1166
  int32_t code = 0;
22✔
1167
  char    path[TSDB_FILENAME_LEN] = {0};
22✔
1168

1169
  char   *dirName[] = {VNODE_TSDB_DIR, VNODE_WAL_DIR, VNODE_META_DIR, VNODE_TSDB_CACHE_DIR};
22✔
1170
  int64_t dirSize[4];
1171

1172
  vnodeGetPrimaryDir(pVnodeObj->path, pVnodeObj->diskPrimary, pVnodeObj->pTfs, path, TSDB_FILENAME_LEN);
22✔
1173
  int32_t offset = strlen(path);
22✔
1174

1175
  for (int i = 0; i < sizeof(dirName) / sizeof(dirName[0]); i++) {
110✔
1176
    int64_t size = {0};
88✔
1177
    (void)snprintf(path + offset, TSDB_FILENAME_LEN, "%s%s", TD_DIRSEP, dirName[i]);
88✔
1178
    code = taosGetDirSize(path, &size);
88✔
1179
    if (code != 0) {
88!
1180
      return code;
×
1181
    }
1182
    path[offset] = 0;
88✔
1183
    dirSize[i] = size;
88✔
1184
  }
1185

1186
  pInfo->l1Size = dirSize[0] - dirSize[3];
22✔
1187
  pInfo->walSize = dirSize[1];
22✔
1188
  pInfo->metaSize = dirSize[2];
22✔
1189
  pInfo->cacheSize = dirSize[3];
22✔
1190

1191
  code = tsdbGetS3Size(pVnodeObj->pTsdb, &pInfo->s3Size);
22✔
1192

1193
  return code;
22✔
1194
}
1195

1196
int32_t vnodeGetStreamProgress(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
6,033✔
1197
  int32_t            code = 0;
6,033✔
1198
  SStreamProgressReq req;
1199
  SStreamProgressRsp rsp = {0};
6,033✔
1200
  SRpcMsg            rpcMsg = {.info = pMsg->info, .code = 0};
6,033✔
1201
  char              *buf = NULL;
6,033✔
1202
  int32_t            rspLen = 0;
6,033✔
1203
  code = tDeserializeStreamProgressReq(pMsg->pCont, pMsg->contLen, &req);
6,033✔
1204

1205
  if (code == TSDB_CODE_SUCCESS) {
6,034!
1206
    rsp.fetchIdx = req.fetchIdx;
6,034✔
1207
    rsp.subFetchIdx = req.subFetchIdx;
6,034✔
1208
    rsp.vgId = req.vgId;
6,034✔
1209
    rsp.streamId = req.streamId;
6,034✔
1210
    rspLen = tSerializeStreamProgressRsp(0, 0, &rsp);
6,034✔
1211
    if (rspLen < 0) {
6,034!
UNCOV
1212
      code = terrno;
×
UNCOV
1213
      goto _OVER;
×
1214
    }
1215
    if (direct) {
6,034!
UNCOV
1216
      buf = rpcMallocCont(rspLen);
×
1217
    } else {
1218
      buf = taosMemoryCalloc(1, rspLen);
6,034!
1219
    }
1220
    if (!buf) {
6,033!
UNCOV
1221
      code = terrno;
×
UNCOV
1222
      goto _OVER;
×
1223
    }
1224
  }
1225

1226
  if (code == TSDB_CODE_SUCCESS) {
6,033!
1227
    code = tqGetStreamExecInfo(pVnode, req.streamId, &rsp.progressDelay, &rsp.fillHisFinished);
6,033✔
1228
  }
1229
  if (code == TSDB_CODE_SUCCESS) {
6,034!
1230
    rspLen = tSerializeStreamProgressRsp(buf, rspLen, &rsp);
6,034✔
1231
    if (rspLen < 0) {
6,034!
UNCOV
1232
      code = terrno;
×
UNCOV
1233
      goto _OVER;
×
1234
    }
1235
    rpcMsg.pCont = buf;
6,034✔
1236
    buf = NULL;
6,034✔
1237
    rpcMsg.contLen = rspLen;
6,034✔
1238
    rpcMsg.code = code;
6,034✔
1239
    rpcMsg.msgType = pMsg->msgType;
6,034✔
1240
    if (direct) {
6,034!
UNCOV
1241
      tmsgSendRsp(&rpcMsg);
×
1242
    } else {
1243
      *pMsg = rpcMsg;
6,034✔
1244
    }
1245
  }
1246

UNCOV
1247
_OVER:
×
1248
  if (buf) {
6,034!
UNCOV
1249
    taosMemoryFree(buf);
×
1250
  }
1251
  return code;
6,034✔
1252
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc