• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/source/dnode/vnode/src/meta/metaSma.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "meta.h"
17
#include "vnodeInt.h"
18

19
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME);
20
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME);
21

UNCOV
22
int32_t metaCreateTSma(SMeta *pMeta, int64_t version, SSmaCfg *pCfg) {
×
23
  // TODO: Validate the cfg
24
  // The table uid should exists and be super table or normal table.
25
  // Check other cfg value
26

UNCOV
27
  SMetaEntry  me = {0};
×
UNCOV
28
  int         kLen = 0;
×
UNCOV
29
  int         vLen = 0;
×
UNCOV
30
  const void *pKey = NULL;
×
UNCOV
31
  const void *pVal = NULL;
×
UNCOV
32
  void       *pBuf = NULL;
×
UNCOV
33
  int32_t     szBuf = 0;
×
UNCOV
34
  void       *p = NULL;
×
UNCOV
35
  SMetaReader mr = {0};
×
UNCOV
36
  int32_t     code = 0;
×
37

38
  // validate req
39
  // save smaIndex
UNCOV
40
  metaReaderDoInit(&mr, pMeta, META_READER_LOCK);
×
UNCOV
41
  if (metaReaderGetTableEntryByUidCache(&mr, pCfg->indexUid) == 0) {
×
42
#if 1
43
    metaReaderClear(&mr);
×
44
    return terrno = TSDB_CODE_TSMA_ALREADY_EXIST;
×
45
#else
46
    metaReaderClear(&mr);
47
    return 0;
48
#endif
49
  }
UNCOV
50
  metaReaderClear(&mr);
×
51

52
  // set structs
UNCOV
53
  me.version = version;
×
UNCOV
54
  me.type = TSDB_TSMA_TABLE;
×
UNCOV
55
  me.uid = pCfg->indexUid;
×
UNCOV
56
  me.name = pCfg->indexName;
×
UNCOV
57
  me.smaEntry.tsma = pCfg;
×
58

UNCOV
59
  code = metaHandleSmaEntry(pMeta, &me);
×
UNCOV
60
  if (code) goto _err;
×
61

UNCOV
62
  metaDebug("vgId:%d, tsma is created, name:%s uid:%" PRId64, TD_VID(pMeta->pVnode), pCfg->indexName, pCfg->indexUid);
×
63

UNCOV
64
  return 0;
×
65

66
_err:
×
67
  metaError("vgId:%d, failed to create tsma:%s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pCfg->indexName,
×
68
            pCfg->indexUid, tstrerror(terrno));
69
  return code;
×
70
}
71

72
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) {
×
73
  // TODO: Validate the cfg
74
  // TODO: add atomicity
75

76
#ifdef META_REFACT
77
#else
78
  if (metaRemoveSmaFromDb(pMeta, indexUid) < 0) {
79
    // TODO: handle error
80
    return -1;
81
  }
82
#endif
83
  return TSDB_CODE_SUCCESS;
×
84
}
85

UNCOV
86
static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
×
87
  STbDbKey tbDbKey;
UNCOV
88
  void    *pKey = NULL;
×
UNCOV
89
  void    *pVal = NULL;
×
UNCOV
90
  int      kLen = 0;
×
UNCOV
91
  int      vLen = 0;
×
UNCOV
92
  SEncoder coder = {0};
×
93

94
  // set key and value
UNCOV
95
  tbDbKey.version = pME->version;
×
UNCOV
96
  tbDbKey.uid = pME->uid;
×
97

UNCOV
98
  pKey = &tbDbKey;
×
UNCOV
99
  kLen = sizeof(tbDbKey);
×
100

UNCOV
101
  int32_t ret = 0;
×
UNCOV
102
  tEncodeSize(metaEncodeEntry, pME, vLen, ret);
×
UNCOV
103
  if (ret < 0) {
×
104
    goto _err;
×
105
  }
106

UNCOV
107
  pVal = taosMemoryMalloc(vLen);
×
UNCOV
108
  if (pVal == NULL) {
×
109
    terrno = terrno;
×
110
    goto _err;
×
111
  }
112

UNCOV
113
  tEncoderInit(&coder, pVal, vLen);
×
114

UNCOV
115
  if (metaEncodeEntry(&coder, pME) < 0) {
×
116
    goto _err;
×
117
  }
118

UNCOV
119
  tEncoderClear(&coder);
×
120

121
  // write to table.db
UNCOV
122
  if (tdbTbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, pMeta->txn) < 0) {
×
123
    goto _err;
×
124
  }
125

UNCOV
126
  taosMemoryFree(pVal);
×
UNCOV
127
  return 0;
×
128

129
_err:
×
130
  taosMemoryFree(pVal);
×
131
  return -1;
×
132
}
133

UNCOV
134
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
×
UNCOV
135
  SUidIdxVal uidIdxVal = {.suid = pME->smaEntry.tsma->indexUid, .version = pME->version, .skmVer = 0};
×
UNCOV
136
  return tdbTbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &uidIdxVal, sizeof(uidIdxVal), pMeta->txn);
×
137
}
138

UNCOV
139
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
×
UNCOV
140
  return tdbTbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn);
×
141
}
142

UNCOV
143
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
×
UNCOV
144
  SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
×
145

UNCOV
146
  return tdbTbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, pMeta->txn);
×
147
}
148

UNCOV
149
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {
×
UNCOV
150
  int32_t code = 0;
×
UNCOV
151
  metaWLock(pMeta);
×
152

153
  // save to table.db
UNCOV
154
  if ((code = metaSaveSmaToDB(pMeta, pME)) < 0) goto _err;
×
155

156
  // update uid.idx
UNCOV
157
  if ((code = metaUpdateUidIdx(pMeta, pME)) < 0) goto _err;
×
158

159
  // update name.idx
UNCOV
160
  if ((code = metaUpdateNameIdx(pMeta, pME)) < 0) goto _err;
×
161

162
  // update sma.idx
UNCOV
163
  if ((code = metaUpdateSmaIdx(pMeta, pME)) < 0) goto _err;
×
164

UNCOV
165
  metaULock(pMeta);
×
UNCOV
166
  return 0;
×
167

168
_err:
×
169
  metaULock(pMeta);
×
170
  return code;
×
171
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc