• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.71
/source/dnode/mnode/impl/src/mndInfoSchema.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#define _DEFAULT_SOURCE
17
#include "mndInt.h"
18
#include "systable.h"
19

20
static int32_t mndInitInfosTableSchema(const SSysDbTableSchema *pSrc, int32_t colNum, SSchema **pDst) {
66,424✔
21
  int32_t  code = 0;
66,424✔
22
  SSchema *schema = taosMemoryCalloc(colNum, sizeof(SSchema));
66,424!
23
  if (NULL == schema) {
66,424!
24
    code = terrno;
×
25
    TAOS_RETURN(code);
×
26
  }
27

28
  for (int32_t i = 0; i < colNum; ++i) {
631,028✔
29
    tstrncpy(schema[i].name, pSrc[i].name, sizeof(schema[i].name));
564,604✔
30
    schema[i].type = pSrc[i].type;
564,604✔
31
    schema[i].colId = i + 1;
564,604✔
32
    schema[i].bytes = pSrc[i].bytes;
564,604✔
33
    if (pSrc[i].sysInfo) {
564,604✔
34
      schema[i].flags |= COL_IS_SYSINFO;
279,680✔
35
    }
36
  }
37

38
  *pDst = schema;
66,424✔
39
  TAOS_RETURN(code);
66,424✔
40
}
41

42
static int32_t mndInsInitMeta(SHashObj *hash) {
1,748✔
43
  int32_t       code = 0;
1,748✔
44
  STableMetaRsp meta = {0};
1,748✔
45

46
  tstrncpy(meta.dbFName, TSDB_INFORMATION_SCHEMA_DB, sizeof(meta.dbFName));
1,748✔
47
  meta.tableType = TSDB_SYSTEM_TABLE;
1,748✔
48
  meta.sversion = 1;
1,748✔
49
  meta.tversion = 1;
1,748✔
50
  meta.virtualStb = false;
1,748✔
51

52
  size_t               size = 0;
1,748✔
53
  const SSysTableMeta *pInfosTableMeta = NULL;
1,748✔
54
  getInfosDbMeta(&pInfosTableMeta, &size);
1,748✔
55

56
  for (int32_t i = 0; i < size; ++i) {
68,172✔
57
    tstrncpy(meta.tbName, pInfosTableMeta[i].name, sizeof(meta.tbName));
66,424✔
58
    meta.numOfColumns = pInfosTableMeta[i].colNum;
66,424✔
59
    meta.sysInfo = pInfosTableMeta[i].sysInfo;
66,424✔
60

61
    TAOS_CHECK_RETURN(mndInitInfosTableSchema(pInfosTableMeta[i].schema, pInfosTableMeta[i].colNum, &meta.pSchemas));
66,424!
62

63
    if (taosHashPut(hash, meta.tbName, strlen(meta.tbName), &meta, sizeof(meta))) {
66,424!
64
      code = terrno;
×
UNCOV
65
      TAOS_RETURN(code);
×
66
    }
67
  }
68

69
  TAOS_RETURN(code);
1,748✔
70
}
71

72
int32_t mndBuildInsTableSchema(SMnode *pMnode, const char *dbFName, const char *tbName, bool sysinfo,
112,825✔
73
                               STableMetaRsp *pRsp) {
74
  int32_t code = 0;
112,825✔
75
  if (NULL == pMnode->infosMeta) {
112,825!
76
    code = TSDB_CODE_APP_ERROR;
×
UNCOV
77
    TAOS_RETURN(code);
×
78
  }
79

80
  STableMetaRsp *pMeta = NULL;
112,825✔
81
  if (strcmp(tbName, TSDB_INS_TABLE_USERS_FULL) == 0) {
112,825!
UNCOV
82
    pMeta = taosHashGet(pMnode->infosMeta, TSDB_INS_TABLE_USERS_FULL, strlen(tbName));
×
83
  } else {
84
    pMeta = taosHashGet(pMnode->infosMeta, tbName, strlen(tbName));
112,825✔
85
  }
86

87
  if (NULL == pMeta) {
112,825✔
88
    mError("invalid information schema table name:%s", tbName);
78!
89
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
78✔
90
    TAOS_RETURN(code);
78✔
91
  }
92

93
  if (!sysinfo && pMeta->sysInfo) {
112,747✔
94
    mError("no permission to get schema of table name:%s", tbName);
25!
95
    code = TSDB_CODE_PAR_PERMISSION_DENIED;
25✔
96
    TAOS_RETURN(code);
25✔
97
  }
98

99
  *pRsp = *pMeta;
112,722✔
100

101
  pRsp->pSchemas = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchema));
112,722!
102
  if (pRsp->pSchemas == NULL) {
112,722!
103
    code = terrno;
×
104
    pRsp->pSchemas = NULL;
×
UNCOV
105
    TAOS_RETURN(code);
×
106
  }
107

108
  memcpy(pRsp->pSchemas, pMeta->pSchemas, pMeta->numOfColumns * sizeof(SSchema));
112,722✔
109
  TAOS_RETURN(code);
112,722✔
110
}
111

112
int32_t mndBuildInsTableCfg(SMnode *pMnode, const char *dbFName, const char *tbName, STableCfgRsp *pRsp) {
×
113
  int32_t code = 0;
×
114
  if (NULL == pMnode->infosMeta) {
×
115
    code = TSDB_CODE_APP_ERROR;
×
UNCOV
116
    TAOS_RETURN(code);
×
117
  }
118

119
  STableMetaRsp *pMeta = taosHashGet(pMnode->infosMeta, tbName, strlen(tbName));
×
120
  if (NULL == pMeta) {
×
121
    mError("invalid information schema table name:%s", tbName);
×
122
    code = TSDB_CODE_PAR_TABLE_NOT_EXIST;
×
UNCOV
123
    TAOS_RETURN(code);
×
124
  }
125

126
  tstrncpy(pRsp->tbName, pMeta->tbName, sizeof(pRsp->tbName));
×
127
  tstrncpy(pRsp->stbName, pMeta->stbName, sizeof(pRsp->stbName));
×
128
  tstrncpy(pRsp->dbFName, pMeta->dbFName, sizeof(pRsp->dbFName));
×
129
  pRsp->numOfTags = pMeta->numOfTags;
×
130
  pRsp->numOfColumns = pMeta->numOfColumns;
×
UNCOV
131
  pRsp->tableType = pMeta->tableType;
×
132
  pRsp->virtualStb = pMeta->virtualStb;
×
133

134
  pRsp->pSchemas = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchema));
×
135
  if (pRsp->pSchemas == NULL) {
×
136
    code = terrno;
×
UNCOV
137
    pRsp->pSchemas = NULL;
×
UNCOV
138
    TAOS_RETURN(code);
×
139
  }
140

141
  memcpy(pRsp->pSchemas, pMeta->pSchemas, pMeta->numOfColumns * sizeof(SSchema));
×
142

UNCOV
143
  pRsp->pSchemaExt = taosMemoryCalloc(pMeta->numOfColumns, sizeof(SSchemaExt));
×
UNCOV
144
  pRsp->pColRefs = taosMemCalloc(pMeta->numOfColumns, sizeof(SColRef));
×
UNCOV
145
  TAOS_RETURN(code);
×
146
}
147

148
int32_t mndInitInfos(SMnode *pMnode) {
1,748✔
149
  pMnode->infosMeta = taosHashInit(20, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), false, HASH_NO_LOCK);
1,748✔
150
  if (pMnode->infosMeta == NULL) {
1,748!
UNCOV
151
    TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
×
152
  }
153

154
  return mndInsInitMeta(pMnode->infosMeta);
1,748✔
155
}
156

157
void mndCleanupInfos(SMnode *pMnode) {
1,747✔
158
  if (NULL == pMnode->infosMeta) {
1,747!
UNCOV
159
    return;
×
160
  }
161

162
  STableMetaRsp *pMeta = taosHashIterate(pMnode->infosMeta, NULL);
1,747✔
163
  while (pMeta) {
68,133✔
164
    taosMemoryFreeClear(pMeta->pSchemas);
66,386!
165
    pMeta = taosHashIterate(pMnode->infosMeta, pMeta);
66,386✔
166
  }
167

168
  taosHashCleanup(pMnode->infosMeta);
1,747✔
169
  pMnode->infosMeta = NULL;
1,747✔
170
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc